# Copyright 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import logging import os import socket import sys import threading from contextlib import contextmanager from multiprocessing import connection import common from autotest_lib.site_utils.lxc import constants from autotest_lib.site_utils.lxc.container_pool import message # Default server-side timeout in seconds; limits time to fetch container. _SERVER_CONNECTION_TIMEOUT = 1 # Extra timeout to use on the client side; limits network communication time. _CLIENT_CONNECTION_TIMEOUT = 5 class Client(object): """A class for communicating with a container pool service. The Client class enables clients to communicate with a running container pool service - for example, to query current status, or to obtain a new container. Here is an example usage: def status(); client = Client(pool_address, timeout) print(client.get_status()) client.close() In addition, the class provides a context manager for easier cleanup: def status(): with Client.connect(pool_address, timeout) as client: print(client.get_status()) """ def __init__(self, address=None, timeout=_SERVER_CONNECTION_TIMEOUT): """Initializes a new Client object. @param address: The address of the pool to connect to. @param timeout: A connection timeout, in seconds. @raises socket.error: If some other miscelleneous socket error occurs (e.g. if the socket does not exist) @raises socket.timeout: If the connection is not established before the given timeout expires. """ if address is None: address = os.path.join( constants.DEFAULT_SHARED_HOST_PATH, constants.DEFAULT_CONTAINER_POOL_SOCKET) self._connection = _ConnectionHelper(address).connect(timeout) @classmethod @contextmanager def connect(cls, address, timeout): """A ContextManager for Client objects. @param address: The address of the pool's communication socket. @param timeout: A connection timeout, in seconds. @return: A Client connected to the domain socket on the given address. @raises socket.error: If some other miscelleneous socket error occurs (e.g. if the socket does not exist) @raises socket.timeout: If the connection is not established before the given timeout expires. """ client = Client(address, timeout) try: yield client finally: client.close() def close(self): """Closes the client connection.""" self._connection.close() self._connection = None def get_container(self, id, timeout): """Retrieves a container from the pool service. @param id: A ContainerId to assign to the container. Containers require an ID when they are dissociated from the pool, so that they can be tracked. @param timeout: A timeout (in seconds) to wait for the operation to complete. A timeout of 0 will return immediately if no containers are available. @return: A container from the pool, when one becomes available, or None if no containers were available within the specified timeout. """ self._connection.send(message.get(id, timeout)) # The service side guarantees that it always returns something # (i.e. a Container, or None) within the specified timeout period, or # to wait indefinitely if given None. # However, we don't entirely trust it and account for network problems. if timeout is None or self._connection.poll( timeout + _CLIENT_CONNECTION_TIMEOUT): return self._connection.recv() else: logging.debug('No container (id=%s). Connection failed.', id) return None def get_status(self): """Gets the status of the connected Pool.""" self._connection.send(message.status()) return self._connection.recv() def shutdown(self): """Stops the service.""" self._connection.send(message.shutdown()) # Wait for ack. self._connection.recv() class _ConnectionHelper(threading.Thread): """Factory class for making client connections with a timeout. Instantiate this with an address, and call connect. The factory will take care of polling for a connection. If a connection is not established within a set period of time, the make_connction call will raise a socket.timeout exception instead of hanging indefinitely. """ def __init__(self, address): super(_ConnectionHelper, self).__init__() # Use a daemon thread, so that if this thread hangs, it doesn't keep the # parent thread alive. All daemon threads die when the parent process # dies. self.daemon = True self._address = address self._client = None self._exc_info = None def run(self): """Instantiates a connection.Client.""" try: logging.debug('Attempting connection to %s', self._address) self._client = connection.Client(self._address) logging.debug('Connection to %s successful', self._address) except Exception: self._exc_info = sys.exc_info() def connect(self, timeout): """Attempts to create a connection.Client with a timeout. Every 5 seconds a warning will be logged for debugging purposes. After the timeout expires, the function will raise a socket.timout error. @param timeout: A connection timeout, in seconds. @return: A connection.Client connected using the address that was specified when this factory was created. @raises socket.timeout: If the connection is not established before the given timeout expires. """ # Start the thread, which attempts to open the connection. self.start() # Poll approximately once a second, so clients don't wait forever. # Range starts at 1 for readability (so the message below doesn't say 0 # seconds). # Range ends at timeout+2 so that a timeout of 0 results in at least 1 # try. for i in range(1, timeout + 2): self.join(1) if self._exc_info is not None: raise self._exc_info[0], self._exc_info[1], self._exc_info[2] elif self._client is not None: return self._client # Log a warning when we first detect a potential problem, then every # 5 seconds after that. if i < 3 or i % 5 == 0: logging.warning( 'Test client failed to connect after %s seconds.', i) # Still no connection - time out. raise socket.timeout('Test client timed out waiting for connection.')