普通文本  |  193行  |  7.09 KB

# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import os
import socket
import sys
import threading
from contextlib import contextmanager
from multiprocessing import connection

import common
from autotest_lib.site_utils.lxc import constants
from autotest_lib.site_utils.lxc.container_pool import message


# Default server-side timeout in seconds; limits time to fetch container.
_SERVER_CONNECTION_TIMEOUT = 1
# Extra timeout to use on the client side; limits network communication time.
_CLIENT_CONNECTION_TIMEOUT = 5

class Client(object):
    """A class for communicating with a container pool service.

    The Client class enables clients to communicate with a running container
    pool service - for example, to query current status, or to obtain a new
    container.

    Here is an example usage:

    def status();
      client = Client(pool_address, timeout)
      print(client.get_status())
      client.close()

    In addition, the class provides a context manager for easier cleanup:

    def status():
      with Client.connect(pool_address, timeout) as client:
        print(client.get_status())
    """

    def __init__(self, address=None, timeout=_SERVER_CONNECTION_TIMEOUT):
        """Initializes a new Client object.

        @param address: The address of the pool to connect to.
        @param timeout: A connection timeout, in seconds.

        @raises socket.error: If some other miscelleneous socket error occurs
                              (e.g. if the socket does not exist)
        @raises socket.timeout: If the connection is not established before the
                                given timeout expires.
        """
        if address is None:
            address = os.path.join(
                constants.DEFAULT_SHARED_HOST_PATH,
                constants.DEFAULT_CONTAINER_POOL_SOCKET)
        self._connection = _ConnectionHelper(address).connect(timeout)


    @classmethod
    @contextmanager
    def connect(cls, address, timeout):
        """A ContextManager for Client objects.

        @param address: The address of the pool's communication socket.
        @param timeout: A connection timeout, in seconds.

        @return: A Client connected to the domain socket on the given address.

        @raises socket.error: If some other miscelleneous socket error occurs
                              (e.g. if the socket does not exist)
        @raises socket.timeout: If the connection is not established before the
                                given timeout expires.
        """
        client = Client(address, timeout)
        try:
            yield client
        finally:
            client.close()


    def close(self):
        """Closes the client connection."""
        self._connection.close()
        self._connection = None


    def get_container(self, id, timeout):
        """Retrieves a container from the pool service.

        @param id: A ContainerId to assign to the container.  Containers require
                   an ID when they are dissociated from the pool, so that they
                   can be tracked.
        @param timeout: A timeout (in seconds) to wait for the operation to
                        complete.  A timeout of 0 will return immediately if no
                        containers are available.

        @return: A container from the pool, when one becomes available, or None
                 if no containers were available within the specified timeout.
        """
        self._connection.send(message.get(id, timeout))
        # The service side guarantees that it always returns something
        # (i.e. a Container, or None) within the specified timeout period, or
        # to wait indefinitely if given None.
        # However, we don't entirely trust it and account for network problems.
        if timeout is None or self._connection.poll(
                timeout + _CLIENT_CONNECTION_TIMEOUT):
            return self._connection.recv()
        else:
            logging.debug('No container (id=%s). Connection failed.', id)
            return None


    def get_status(self):
        """Gets the status of the connected Pool."""
        self._connection.send(message.status())
        return self._connection.recv()


    def shutdown(self):
        """Stops the service."""
        self._connection.send(message.shutdown())
        # Wait for ack.
        self._connection.recv()


class _ConnectionHelper(threading.Thread):
    """Factory class for making client connections with a timeout.

    Instantiate this with an address, and call connect.  The factory will take
    care of polling for a connection.  If a connection is not established within
    a set period of time, the make_connction call will raise a socket.timeout
    exception instead of hanging indefinitely.
    """
    def __init__(self, address):
        super(_ConnectionHelper, self).__init__()
        # Use a daemon thread, so that if this thread hangs, it doesn't keep the
        # parent thread alive.  All daemon threads die when the parent process
        # dies.
        self.daemon = True
        self._address = address
        self._client = None
        self._exc_info = None


    def run(self):
        """Instantiates a connection.Client."""
        try:
            logging.debug('Attempting connection to %s', self._address)
            self._client = connection.Client(self._address)
            logging.debug('Connection to %s successful', self._address)
        except Exception:
            self._exc_info = sys.exc_info()


    def connect(self, timeout):
        """Attempts to create a connection.Client with a timeout.

        Every 5 seconds a warning will be logged for debugging purposes.  After
        the timeout expires, the function will raise a socket.timout error.

        @param timeout: A connection timeout, in seconds.

        @return: A connection.Client connected using the address that was
                 specified when this factory was created.

        @raises socket.timeout: If the connection is not established before the
                                given timeout expires.
        """
        # Start the thread, which attempts to open the connection.
        self.start()
        # Poll approximately once a second, so clients don't wait forever.
        # Range starts at 1 for readability (so the message below doesn't say 0
        # seconds).
        # Range ends at timeout+2 so that a timeout of 0 results in at least 1
        # try.
        for i in range(1, timeout + 2):
            self.join(1)
            if self._exc_info is not None:
                raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
            elif self._client is not None:
                return self._client

            # Log a warning when we first detect a potential problem, then every
            # 5 seconds after that.
            if i < 3 or i % 5 == 0:
                logging.warning(
                    'Test client failed to connect after %s seconds.', i)
        # Still no connection - time out.
        raise socket.timeout('Test client timed out waiting for connection.')