# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import logging
import os
import threading
import time
import common
from autotest_lib.client.common_lib import utils
from autotest_lib.site_utils.lxc import base_image
from autotest_lib.site_utils.lxc import constants
from autotest_lib.site_utils.lxc import container_factory
from autotest_lib.site_utils.lxc import zygote
from autotest_lib.site_utils.lxc.constants import \
CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX
from autotest_lib.site_utils.lxc.container_pool import async_listener
from autotest_lib.site_utils.lxc.container_pool import error
from autotest_lib.site_utils.lxc.container_pool import message
from autotest_lib.site_utils.lxc.container_pool import pool
try:
import cPickle as pickle
except:
import pickle
try:
from chromite.lib import metrics
from infra_libs import ts_mon
except ImportError:
import mock
metrics = utils.metrics_mock
ts_mon = mock.Mock()
# The minimum period between polling for new connections, in seconds.
_MIN_POLLING_PERIOD = 0.1
class Service(object):
"""A manager for a pool of LXC containers.
The Service class manages client communication with an underlying container
pool. It listens for incoming client connections, then spawns threads to
deal with communication with each client.
"""
def __init__(self, host_dir, pool=None):
"""Sets up a new container pool service.
@param host_dir: A SharedHostDir. This will be used for Zygote
configuration as well as for general pool operation
(e.g. opening linux domain sockets for communication).
@param pool: (for testing) A container pool that the service will
maintain. This parameter exists for DI, for testing.
Under normal circumstances the service instantiates the
container pool internally.
"""
# Create socket for receiving container pool requests. This also acts
# as a mutex, preventing multiple container pools from being
# instantiated.
self._socket_path = os.path.join(
host_dir.path, constants.DEFAULT_CONTAINER_POOL_SOCKET)
self._connection_listener = async_listener.AsyncListener(
self._socket_path)
self._client_threads = []
self._stop_event = None
self._running = False
self._pool = pool
def start(self, pool_size=constants.DEFAULT_CONTAINER_POOL_SIZE):
"""Starts the service.
@param pool_size: The desired size of the container pool. This
parameter has no effect if a pre-created pool was DI'd
into the Service constructor.
"""
self._running = True
# Start the container pool.
if self._pool is None:
factory = container_factory.ContainerFactory(
base_container=base_image.BaseImage().get(),
container_class=zygote.Zygote)
self._pool = pool.Pool(factory=factory, size=pool_size)
# Start listening asynchronously for incoming connections.
self._connection_listener.start()
# Poll for incoming connections, and spawn threads to handle them.
logging.debug('Start event loop.')
while self._stop_event is None:
self._handle_incoming_connections()
self._cleanup_closed_connections()
# TODO(kenobi): Poll for and log errors from pool.
metrics.Counter(METRICS_PREFIX + '/tick').increment()
time.sleep(_MIN_POLLING_PERIOD)
logging.debug('Exit event loop.')
# Stopped - stop all the client threads, stop listening, then signal
# that shutdown is complete.
for thread in self._client_threads:
thread.stop()
try:
self._connection_listener.close()
except Exception as e:
logging.error('Error stopping pool service: %r', e)
raise
finally:
# Clean up the container pool.
self._pool.cleanup()
# Make sure state is consistent.
self._stop_event.set()
self._stop_event = None
self._running = False
metrics.Counter(METRICS_PREFIX + '/service_stopped').increment()
logging.debug('Container pool stopped.')
def stop(self):
"""Stops the service."""
self._stop_event = threading.Event()
return self._stop_event
def is_running(self):
"""Returns whether or not the service is currently running."""
return self._running
def get_status(self):
"""Returns a dictionary of values describing the current status."""
status = {}
status['running'] = self._running
status['socket_path'] = self._socket_path
if self._running:
status['pool capacity'] = self._pool.capacity
status['pool size'] = self._pool.size
status['pool worker count'] = self._pool.worker_count
status['pool errors'] = self._pool.errors.qsize()
status['client thread count'] = len(self._client_threads)
return status
def _handle_incoming_connections(self):
"""Checks for connections, and spawn sub-threads to handle requests."""
connection = self._connection_listener.get_connection()
if connection is not None:
# Spawn a thread to deal with the new connection.
thread = _ClientThread(self, self._pool, connection)
thread.start()
self._client_threads.append(thread)
thread_count = len(self._client_threads)
metrics.Counter(METRICS_PREFIX + '/client_threads'
).increment_by(thread_count)
logging.debug('Client thread count: %d', thread_count)
def _cleanup_closed_connections(self):
"""Cleans up dead client threads."""
# We don't need to lock because all operations on self._client_threads
# take place on the main thread.
self._client_threads = [t for t in self._client_threads if t.is_alive()]
class _ClientThread(threading.Thread):
"""A class that handles communication with a pool client.
Use a thread-per-connection model instead of select()/poll() for a few
reasons:
- the number of simultaneous clients is not expected to be high enough for
select or poll to really pay off.
- one thread per connection is more robust - if a single client somehow
crashes its communication thread, that will not affect the other
communication threads or the main pool service.
"""
def __init__(self, service, pool, connection):
self._service = service
self._pool = pool
self._connection = connection
self._running = False
super(_ClientThread, self).__init__(name='client_thread')
def run(self):
"""Handles messages coming in from clients.
The thread main loop monitors the connection and handles incoming
messages. Polling is used so that the loop condition can be checked
regularly - this enables the thread to exit cleanly if required.
Any kind of error will exit the event loop and close the connection.
"""
logging.debug('Start event loop.')
try:
self._running = True
while self._running:
# Poll and deal with messages every second. The timeout enables
# the thread to exit cleanly when stop() is called.
if self._connection.poll(1):
try:
msg = self._connection.recv()
except (AttributeError,
ImportError,
IndexError,
pickle.UnpicklingError) as e:
# All of these can occur while unpickling data.
logging.error('Error while receiving message: %r', e)
# Exit if an error occurs
break
except EOFError:
# EOFError means the client closed the connection. This
# is not an error - just exit.
break
try:
response = self._handle_message(msg)
# Always send the response, even if it's None. This
# provides more consistent client-side behaviour.
self._connection.send(response)
except error.UnknownMessageTypeError as e:
# The message received was a valid python object, but
# not a valid Message.
logging.error('Message error: %s', e)
# Exit if an error occurs
break
except EOFError:
# EOFError means the client closed the connection early.
# TODO(chromium:794685): Return container to pool.
logging.error('Client closed connection before return.')
break
finally:
# Always close the connection.
logging.debug('Exit event loop.')
self._connection.close()
def stop(self):
"""Stops the client thread."""
self._running = False
def _handle_message(self, msg):
"""Handles incoming messages.
@param msg: The incoming message to be handled.
@return: A pickleable object (or None) that should be sent back to the
client.
"""
# Only handle Message objects.
if not isinstance(msg, message.Message):
raise error.UnknownMessageTypeError(
'Invalid message class %s' % type(msg))
# Use a dispatch table to simulate switch/case.
handlers = {
message.ECHO: self._echo,
message.GET: self._get,
message.SHUTDOWN: self._shutdown,
message.STATUS: self._status,
}
try:
return handlers[msg.type](**msg.args)
except KeyError:
raise error.UnknownMessageTypeError(
'Invalid message type %s' % msg.type)
def _echo(self, msg):
"""Handles ECHO messages.
@param msg: A string that will be echoed back to the client.
@return: The message, for echoing back to the client.
"""
# Just echo the message back, for testing aliveness.
logging.debug('Echo: %r', msg)
return msg
def _shutdown(self):
"""Handles SHUTDOWN messages.
@return: An ACK message. This function is synchronous (i.e. it blocks,
and only returns the ACK when shutdown is complete).
"""
logging.debug('Received shutdown request.')
# Request shutdown. Wait for the service to actually stop before
# sending the response.
self._service.stop().wait()
logging.debug('Service shutdown complete.')
return message.ack()
def _status(self):
"""Handles STATUS messages.
@return: The result of the service status call.
"""
logging.debug('Received status request.')
return self._service.get_status()
def _get(self, id, timeout):
"""Gets a container from the pool.
@param id: A ContainerId to assign to the new container.
@param timeout: A timeout (in seconds) to wait for the pool. If a
container is not available from the pool within the
given period, None will be returned.
@return: A container from the pool.
"""
logging.debug('Received get request (id=%s)', id)
container = self._pool.get(timeout)
# Assign an ID to the container as soon as it is removed from the pool.
# This associates the container with the process to which it will be
# handed off.
if container is not None:
logging.debug(
'Assigning container (name=%s, id=%s)', container.name, id)
container.id = id
else:
logging.debug('No container (id=%s)', id)
metrics.Counter(METRICS_PREFIX + '/container_requests',
field_spec=[ts_mon.BooleanField('success')]
).increment(fields={'success': (container is not None)})
return container