#!/usr/bin/env python
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import queue
import socket
import threading
import unittest
from host_controller.tradefed import remote_client
from host_controller.tradefed import remote_operation
class MockRemoteManagerThread(threading.Thread):
"""A thread which mocks remote manager.
Attributes:
HOST: Local host name.
PORT: The port that the remote manager listens to.
_remote_mgr_socket: The remote manager socket.
_response_queue: A queue.Queue object containing the response strings.
_timeout: Socket timeout in seconds.
last_error: The exception which caused this thread to terminate.
"""
HOST = remote_client.LOCALHOST
PORT = 32123
def __init__(self, timeout):
"""Creates and listens to remote manager socket."""
super(MockRemoteManagerThread, self).__init__()
self._response_queue = queue.Queue()
self._timeout = timeout
self.last_error = None
self._remote_mgr_socket = socket.socket()
try:
self._remote_mgr_socket.settimeout(self._timeout)
self._remote_mgr_socket.bind((self.HOST, self.PORT))
self._remote_mgr_socket.listen(1)
except socket.error:
self._remote_mgr_socket.close()
def _Respond(self, response_str):
"""Accepts a client connection and responds.
Args:
response_str: The response string.
"""
(server_socket, client_address) = self._remote_mgr_socket.accept()
try:
server_socket.settimeout(self._timeout)
# Receive until connection is closed
while not server_socket.recv(4096):
pass
server_socket.send(response_str)
finally:
server_socket.close()
def AddResponse(self, response_str):
"""Add a response string to the queue.
Args:
response_str: The response string.
"""
self._response_queue.put_nowait(response_str)
def CloseSocket(self):
"""Closes the remote manager socket."""
if self._remote_mgr_socket:
self._remote_mgr_socket.close()
self._remote_mgr_socket = None
# @Override
def run(self):
"""Sends the queued responses to the clients."""
try:
while True:
response_str = self._response_queue.get()
self._response_queue.task_done()
if response_str is None:
break
self._Respond(response_str)
except socket.error as e:
self.last_error = e
finally:
self.CloseSocket()
class RemoteClientTest(unittest.TestCase):
"""A test for remote_client.RemoteClient.
Attributes:
_remote_mgr_thread: An instance of MockRemoteManagerThread.
_client: The remote_client.RemoteClient being tested.
"""
def setUp(self):
"""Creates remote manager thread."""
self._remote_mgr_thread = MockRemoteManagerThread(5)
self._remote_mgr_thread.daemon = True
self._remote_mgr_thread.start()
self._client = remote_client.RemoteClient(self._remote_mgr_thread.HOST,
self._remote_mgr_thread.PORT,
5)
def tearDown(self):
"""Terminates remote manager thread."""
self._remote_mgr_thread.AddResponse(None)
self._remote_mgr_thread.join(15)
self._remote_mgr_thread.CloseSocket()
self.assertFalse(self._remote_mgr_thread.is_alive(),
"Cannot stop remote manager thread.")
if self._remote_mgr_thread.last_error:
raise self._remote_mgr_thread.last_error
def testListDevice(self):
"""Tests ListDevices operation."""
self._remote_mgr_thread.AddResponse('{"serials": []}')
self._client.ListDevices()
def testAddCommand(self):
"""Tests AddCommand operation."""
self._remote_mgr_thread.AddResponse('{}')
self._client.SendOperation(remote_operation.AddCommand(0, "COMMAND"))
def testMultipleOperations(self):
"""Tests sending multiple operations via one connection."""
self._remote_mgr_thread.AddResponse('{}\n{}')
self._client.SendOperations(remote_operation.ListDevices(),
remote_operation.ListDevices())
def testExecuteCommand(self):
"""Tests executing a command and waiting for result."""
self._remote_mgr_thread.AddResponse('{}')
self._client.SendOperation(remote_operation.AllocateDevice("serial123"))
self._remote_mgr_thread.AddResponse('{}')
self._client.SendOperation(remote_operation.ExecuteCommand(
"serial123", "vts", "-m", "SampleShellTest"))
self._remote_mgr_thread.AddResponse('{"status": "EXECUTING"}')
result = self._client.WaitForCommandResult("serial123",
timeout=0.5, poll_interval=1)
self.assertIsNone(result, "Client returns result before command finishes.")
self._remote_mgr_thread.AddResponse('{"status": "EXECUTING"}')
self._remote_mgr_thread.AddResponse('{"status": "INVOCATION_SUCCESS"}')
result = self._client.WaitForCommandResult("serial123",
timeout=5, poll_interval=1)
self._remote_mgr_thread.AddResponse('{}')
self._client.SendOperation(remote_operation.FreeDevice("serial123"))
self.assertIsNotNone(result, "Client doesn't return command result.")
def testSocketError(self):
"""Tests raising exception when socket error occurs."""
self.assertRaises(socket.timeout, self._client.ListDevices)
self._remote_mgr_thread.AddResponse(None)
self.assertRaises(socket.error, self._client.ListDevices)
def testRemoteOperationException(self):
"""Tests raising exception when response is an error."""
self._remote_mgr_thread.AddResponse('{"error": "unit test"}')
self.assertRaises(remote_operation.RemoteOperationException,
self._client.ListDevices)
if __name__ == "__main__":
unittest.main()