# Copyright 2017 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import Queue
import collections
import logging
import threading
import time
import common
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.site_utils.lxc.container_pool import error as lxc_error
from autotest_lib.site_utils.lxc.constants import \
CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX
try:
from chromite.lib import metrics
from infra_libs import ts_mon
except ImportError:
import mock
metrics = utils.metrics_mock
ts_mon = mock.Mock()
# The maximum number of concurrent workers. Each worker is responsible for
# managing the creation of a single container.
# TODO(kenobi): This may be need to be adjusted for different hosts (e.g. full
# vs quarter shards)
_MAX_CONCURRENT_WORKERS = 5
# Timeout (in seconds) for container creation. After this amount of time,
# container creation tasks are abandoned and retried.
_CONTAINER_CREATION_TIMEOUT = 600
# The period (in seconds) affects the rate at which the monitor thread runs its
# event loop. This drives a number of other factors, e.g. how long to wait for
# the thread to respond to shutdown requests.
_MIN_MONITOR_PERIOD = 0.1
# The maximum number of errors per hour. After this limit is reached, further
# pool creation is throttled.
_MAX_ERRORS_PER_HOUR = 200
class Pool(object):
"""A fixed-size pool of LXC containers.
Containers are created using a factory that is passed to the Pool. A pool
size is passed at construction time - this is the number of containers the
Pool will attempt to maintain. Whenever the number of containers falls
below the given size, the Pool will start creating new containers to
replenish itself.
In order to avoid overloading the host, the number of simultaneous container
creations is limited to _MAX_CONCURRENT_WORKERS.
When container creation produces errors, those errors are saved (see
Pool.errors). It is the client's responsibility to periodically check and
empty out the error queue.
"""
def __init__(self, factory, size):
"""Creates a new Pool instance.
@param factory: A factory object that will be called upon to create new
containers. The passed object must have a method called
"create_container" that takes no arguments and returns
an instance of a Container.
@param size: The size of the Pool. The Pool attempts to keep this many
containers running at all times.
"""
# Pools of size less than 2 don't make sense. Don't allow them.
if size < 2:
raise ValueError('Invalid pool size.')
logging.debug('Pool.__init__ called. Size: %d', size)
self._pool = Queue.Queue(size)
self._monitor = _Monitor(factory, self._pool)
self._monitor.start()
def get(self, timeout=0):
"""Gets a container from the pool.
@param timeout: Number of seconds to wait before returning.
- If 0 (the default), return immediately. If a
Container is not immediately available, return None.
- If a positive number, block at most <timeout> seconds,
then return None if a Container was not available
within that time.
- If None, block indefinitely until a Container is
available.
@return: A container from the pool.
"""
try:
# Block only if timeout is not zero.
logging.info('Pool.get called.')
return self._pool.get(block=(timeout != 0),
timeout=timeout)
except Queue.Empty:
return None
def cleanup(self, timeout=0):
"""Cleans up the container pool.
Stops all worker threads, and destroys all Containers still in the Pool.
@param timeout: For testing. If this is non-zero, it specifies the
number of seconds to wait for each worker to shut down.
An error is raised if shutdown has not occurred by then.
If zero (the default), don't wait for worker threads to
shut down, just return immediately.
"""
logging.info('Pool.cleanup called.')
# Stop the monitor thread, then drain the pool.
self._monitor.stop(timeout)
try:
dcount = 0
logging.debug('Emptying container pool')
while True:
container = self._pool.get(block=False)
dcount += 1
container.destroy()
except Queue.Empty:
pass
finally:
metrics.Counter(METRICS_PREFIX + '/containers_cleaned_up'
).increment_by(dcount)
logging.debug('Done. Destroyed %d containers', dcount)
@property
def size(self):
"""Returns the current size of the pool.
Note that the pool is asynchronous. Returning a size greater than zero
does not guarantee that a subsequent call to Pool.get will not block.
Conversely, returning a size of zero does not guarantee that a
subsequent call to Pool.get will block.
"""
return self._pool.qsize()
@property
def capacity(self):
"""Returns the max size of the pool."""
return self._pool.maxsize
@property
def errors(self):
"""Returns worker errors.
@return: A Queue containing all the errors encountered on worker
threads.
"""
return self._monitor.errors;
@property
def worker_count(self):
"""Returns the number of currently active workers.
Note that this isn't quite the same as the number of currently alive
worker threads. Worker threads that have timed out or been cancelled
may be technically alive, but they are not included in this count.
"""
return len(self._monitor._workers)
class _Monitor(threading.Thread):
"""A thread that manages the creation of containers for the pool.
Container creation is potentially time-consuming and can hang or crash. The
Monitor class manages a pool of independent threads, each responsible for
the creation of a single Container. This provides parallelized container
creation and ensures that a single Container creation hanging/crashing does
not starve or crash the Pool.
"""
def __init__(self, factory, pool):
"""Creates a new monitor.
@param factory: A container factory.
@param pool: A pool instance to push created containers into.
"""
super(_Monitor, self).__init__(name='pool_monitor')
self._factory = factory
self._pool = pool
# List of worker threads. Access this only from the monitor thread.
self._worker_max = _MAX_CONCURRENT_WORKERS
self._workers = []
# A flag for stopping the monitor.
self._stop = False
# Stores errors from worker threads.
self.errors = Queue.Queue()
# Throttle on errors, to avoid log spam and CPU spinning.
self._error_timestamps = collections.deque()
def run(self):
"""Supplies the container pool with containers."""
logging.debug('Start event loop.')
while not self._stop:
self._clear_old_errors()
self._create_workers()
self._poll_workers()
time.sleep(_MIN_MONITOR_PERIOD)
logging.debug('Exit event loop.')
# Clean up - stop all workers.
for worker in self._workers:
worker.cancel()
def stop(self, timeout=0):
"""Stops this thread.
This function blocks until the monitor thread has stopped.
@param timeout: If this is a non-zero number, wait this long (in
seconds) for each worker thread to stop. If zero (the
default), don't wait for worker threads to exit.
@raise WorkerTimeoutError: If a worker thread does not exit within the
specified timeout.
"""
logging.info('Stop requested.')
self._stop = True
self.join()
logging.info('Stopped.')
# Wait for workers if timeout was requested.
if timeout > 0:
logging.debug('Waiting for workers to terminate...')
for worker in self._workers:
worker.join(timeout)
if worker.is_alive():
raise lxc_error.WorkerTimeoutError()
def _create_workers(self):
"""Spawns workers to handle container requests.
This method modifies the _workers list and should only be called from
within run().
"""
if self._pool.full():
return
# Do not exceed the worker limit.
if len(self._workers) >= self._worker_max:
return
too_many_errors = len(self._error_timestamps) >= _MAX_ERRORS_PER_HOUR
metrics.Counter(METRICS_PREFIX + '/error_throttled',
field_spec=[ts_mon.BooleanField('throttled')]
).increment(fields={'throttled': too_many_errors})
# Throttle if too many errors occur.
if too_many_errors:
logging.warning('Error throttled (until %d)',
self._error_timestamps[0] + 3600)
return
# Create workers to refill the pool.
qsize = self._pool.qsize()
shortfall = self._pool.maxsize - qsize
old_worker_count = len(self._workers)
# Avoid spamming - only log if the monitor is taking some action. Log
# this before creating worker threads, because we are counting live
# threads and want to avoid race conditions w.r.t. threads actually
# starting.
if (old_worker_count < shortfall and
old_worker_count < self._worker_max):
# This can include workers that aren't currently in the self._worker
# list, e.g. workers that were dropped from the list because they
# timed out.
active_workers = sum([1 for t in threading.enumerate()
if type(t) is _Worker])
# qsize : Current size of the container pool.
# shortfall: Number of empty slots currently in the pool.
# workers : m+n, where m is the current number of active worker
# threads and n is the number of new threads created.
logging.debug('qsize:%d shortfall:%d workers:%d',
qsize, shortfall, active_workers)
if len(self._workers) < shortfall:
worker = _Worker(self._factory,
self._on_worker_result,
self._on_worker_error)
worker.start()
self._workers.append(worker)
def _poll_workers(self):
"""Checks worker states and deals with them.
This method modifies the _workers list and should only be called from
within run().
Completed workers are taken off the worker list and their results/errors
are logged.
"""
completed = []
incomplete = []
for worker in self._workers:
if worker.check_health():
incomplete.append(worker)
else:
completed.append(worker)
self._workers = incomplete
def _on_worker_result(self, result):
"""Receives results from completed worker threads.
Pass this as the result callback for worker threads. Worker threads
should call this when they produce a container.
"""
logging.debug('Worker result: %r', result)
self._pool.put(result)
def _on_worker_error(self, worker, err):
"""Receives errors from worker threads.
Pass this as the error callback for worker threads. Worker threads
should call this if errors occur.
"""
timestamp = time.time()
self._error_timestamps.append(timestamp)
self.errors.put(err)
logging.error('[%d] Worker error: %s', worker.ident, err)
def _clear_old_errors(self):
"""Clears errors more than an hour old out of the log."""
one_hour_ago = time.time() - 3600
metrics.Counter(METRICS_PREFIX + '/recent_errors'
).increment_by(len(self._error_timestamps))
while (self._error_timestamps and
self._error_timestamps[0] < one_hour_ago):
self._error_timestamps.popleft()
# Avoid logspam - log only when some action was taken.
logging.error('Worker error count: %d', len(self._error_timestamps))
class _Worker(threading.Thread):
"""A worker thread tasked with managing the creation of a single container.
The worker is a daemon thread that calls upon a container factory to create
a single container. If container creation raises any exceptions, they are
logged and the worker exits. The worker also provides a mechanism for the
parent thread to impose timeouts on container creation.
"""
def __init__(self, factory, result_cb, error_cb):
"""Creates a new Worker.
@param factory: A factory object that will be called upon to create
Containers.
"""
super(_Worker, self).__init__(name='pool_worker')
# Hanging worker threads should not keep the pool process alive.
self.daemon = True
self._factory = factory
self._result_cb = result_cb
self._error_cb = error_cb
self._cancelled = False
self._start_time = None
# A lock for breaking race conditions in worker cancellation. Use a
# recursive lock because _check_health requires it.
self._completion_lock = threading.RLock()
self._completed = False
def run(self):
"""Creates a single container."""
self._start_time = time.time()
container = None
try:
container = self._factory.create_container()
container.start(wait_for_network=True)
except Exception as e:
logging.error('Worker error: %s', error.format_error())
self._error_cb(self, e)
finally:
# All this has to happen atomically, otherwise race conditions can
# arise w.r.t. cancellation.
with self._completion_lock:
self._completed = True
if self._cancelled:
# If the job was cancelled, destroy the container instead of
# putting it in the result queue. Do not release the
# throttle, as it would have been released when the
# cancellation occurred.
if container is not None:
container.destroy()
else:
# Put the container in the result field. Release the
# throttle so another task can be picked up.
# Container may be None if errors occurred.
if container is not None:
self._result_cb(container)
def cancel(self):
"""Cancels the work request.
The container will be destroyed when created, instead of being added to
the container pool.
"""
with self._completion_lock:
if self._completed:
return False
else:
self._cancelled = True
return True
def check_health(self):
"""Checks that a worker is alive and has not timed out.
Checks the run time of the worker to make sure it hasn't timed out.
Cancels workers that exceed the timeout.
@return: True if the worker is alive and has not timed out, False
otherwise.
"""
# Acquire the completion lock so as to avoid race conditions if the
# factory happens to return just as we are timing out.
with self._completion_lock:
if not self.is_alive() or self._cancelled or self._completed:
return False
# Thread hasn't started yet - count this as healthy.
if self._start_time is None:
return True
# If alive, check the timeout and cancel if necessary.
runtime = time.time() - self._start_time
if runtime > _CONTAINER_CREATION_TIMEOUT:
if self.cancel():
self._error_cb(self, lxc_error.WorkerTimeoutError())
return False
return True