# Copyright 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import socket
import sys
import threading
from contextlib import contextmanager
from multiprocessing import connection
import common
from autotest_lib.site_utils.lxc import constants
from autotest_lib.site_utils.lxc.container_pool import message
# Default server-side timeout in seconds; limits time to fetch container.
_SERVER_CONNECTION_TIMEOUT = 1
# Extra timeout to use on the client side; limits network communication time.
_CLIENT_CONNECTION_TIMEOUT = 5
class Client(object):
"""A class for communicating with a container pool service.
The Client class enables clients to communicate with a running container
pool service - for example, to query current status, or to obtain a new
container.
Here is an example usage:
def status();
client = Client(pool_address, timeout)
print(client.get_status())
client.close()
In addition, the class provides a context manager for easier cleanup:
def status():
with Client.connect(pool_address, timeout) as client:
print(client.get_status())
"""
def __init__(self, address=None, timeout=_SERVER_CONNECTION_TIMEOUT):
"""Initializes a new Client object.
@param address: The address of the pool to connect to.
@param timeout: A connection timeout, in seconds.
@raises socket.error: If some other miscelleneous socket error occurs
(e.g. if the socket does not exist)
@raises socket.timeout: If the connection is not established before the
given timeout expires.
"""
if address is None:
address = os.path.join(
constants.DEFAULT_SHARED_HOST_PATH,
constants.DEFAULT_CONTAINER_POOL_SOCKET)
self._connection = _ConnectionHelper(address).connect(timeout)
@classmethod
@contextmanager
def connect(cls, address, timeout):
"""A ContextManager for Client objects.
@param address: The address of the pool's communication socket.
@param timeout: A connection timeout, in seconds.
@return: A Client connected to the domain socket on the given address.
@raises socket.error: If some other miscelleneous socket error occurs
(e.g. if the socket does not exist)
@raises socket.timeout: If the connection is not established before the
given timeout expires.
"""
client = Client(address, timeout)
try:
yield client
finally:
client.close()
def close(self):
"""Closes the client connection."""
self._connection.close()
self._connection = None
def get_container(self, id, timeout):
"""Retrieves a container from the pool service.
@param id: A ContainerId to assign to the container. Containers require
an ID when they are dissociated from the pool, so that they
can be tracked.
@param timeout: A timeout (in seconds) to wait for the operation to
complete. A timeout of 0 will return immediately if no
containers are available.
@return: A container from the pool, when one becomes available, or None
if no containers were available within the specified timeout.
"""
self._connection.send(message.get(id, timeout))
# The service side guarantees that it always returns something
# (i.e. a Container, or None) within the specified timeout period, or
# to wait indefinitely if given None.
# However, we don't entirely trust it and account for network problems.
if timeout is None or self._connection.poll(
timeout + _CLIENT_CONNECTION_TIMEOUT):
return self._connection.recv()
else:
logging.debug('No container (id=%s). Connection failed.', id)
return None
def get_status(self):
"""Gets the status of the connected Pool."""
self._connection.send(message.status())
return self._connection.recv()
def shutdown(self):
"""Stops the service."""
self._connection.send(message.shutdown())
# Wait for ack.
self._connection.recv()
class _ConnectionHelper(threading.Thread):
"""Factory class for making client connections with a timeout.
Instantiate this with an address, and call connect. The factory will take
care of polling for a connection. If a connection is not established within
a set period of time, the make_connction call will raise a socket.timeout
exception instead of hanging indefinitely.
"""
def __init__(self, address):
super(_ConnectionHelper, self).__init__()
# Use a daemon thread, so that if this thread hangs, it doesn't keep the
# parent thread alive. All daemon threads die when the parent process
# dies.
self.daemon = True
self._address = address
self._client = None
self._exc_info = None
def run(self):
"""Instantiates a connection.Client."""
try:
logging.debug('Attempting connection to %s', self._address)
self._client = connection.Client(self._address)
logging.debug('Connection to %s successful', self._address)
except Exception:
self._exc_info = sys.exc_info()
def connect(self, timeout):
"""Attempts to create a connection.Client with a timeout.
Every 5 seconds a warning will be logged for debugging purposes. After
the timeout expires, the function will raise a socket.timout error.
@param timeout: A connection timeout, in seconds.
@return: A connection.Client connected using the address that was
specified when this factory was created.
@raises socket.timeout: If the connection is not established before the
given timeout expires.
"""
# Start the thread, which attempts to open the connection.
self.start()
# Poll approximately once a second, so clients don't wait forever.
# Range starts at 1 for readability (so the message below doesn't say 0
# seconds).
# Range ends at timeout+2 so that a timeout of 0 results in at least 1
# try.
for i in range(1, timeout + 2):
self.join(1)
if self._exc_info is not None:
raise self._exc_info[0], self._exc_info[1], self._exc_info[2]
elif self._client is not None:
return self._client
# Log a warning when we first detect a potential problem, then every
# 5 seconds after that.
if i < 3 or i % 5 == 0:
logging.warning(
'Test client failed to connect after %s seconds.', i)
# Still no connection - time out.
raise socket.timeout('Test client timed out waiting for connection.')