# Copyright 2017 The Chromium Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. import Queue import collections import logging import threading import time import common from autotest_lib.client.bin import utils from autotest_lib.client.common_lib import error from autotest_lib.site_utils.lxc.container_pool import error as lxc_error from autotest_lib.site_utils.lxc.constants import \ CONTAINER_POOL_METRICS_PREFIX as METRICS_PREFIX try: from chromite.lib import metrics from infra_libs import ts_mon except ImportError: import mock metrics = utils.metrics_mock ts_mon = mock.Mock() # The maximum number of concurrent workers. Each worker is responsible for # managing the creation of a single container. # TODO(kenobi): This may be need to be adjusted for different hosts (e.g. full # vs quarter shards) _MAX_CONCURRENT_WORKERS = 5 # Timeout (in seconds) for container creation. After this amount of time, # container creation tasks are abandoned and retried. _CONTAINER_CREATION_TIMEOUT = 600 # The period (in seconds) affects the rate at which the monitor thread runs its # event loop. This drives a number of other factors, e.g. how long to wait for # the thread to respond to shutdown requests. _MIN_MONITOR_PERIOD = 0.1 # The maximum number of errors per hour. After this limit is reached, further # pool creation is throttled. _MAX_ERRORS_PER_HOUR = 200 class Pool(object): """A fixed-size pool of LXC containers. Containers are created using a factory that is passed to the Pool. A pool size is passed at construction time - this is the number of containers the Pool will attempt to maintain. Whenever the number of containers falls below the given size, the Pool will start creating new containers to replenish itself. In order to avoid overloading the host, the number of simultaneous container creations is limited to _MAX_CONCURRENT_WORKERS. When container creation produces errors, those errors are saved (see Pool.errors). It is the client's responsibility to periodically check and empty out the error queue. """ def __init__(self, factory, size): """Creates a new Pool instance. @param factory: A factory object that will be called upon to create new containers. The passed object must have a method called "create_container" that takes no arguments and returns an instance of a Container. @param size: The size of the Pool. The Pool attempts to keep this many containers running at all times. """ # Pools of size less than 2 don't make sense. Don't allow them. if size < 2: raise ValueError('Invalid pool size.') logging.debug('Pool.__init__ called. Size: %d', size) self._pool = Queue.Queue(size) self._monitor = _Monitor(factory, self._pool) self._monitor.start() def get(self, timeout=0): """Gets a container from the pool. @param timeout: Number of seconds to wait before returning. - If 0 (the default), return immediately. If a Container is not immediately available, return None. - If a positive number, block at most <timeout> seconds, then return None if a Container was not available within that time. - If None, block indefinitely until a Container is available. @return: A container from the pool. """ try: # Block only if timeout is not zero. logging.info('Pool.get called.') return self._pool.get(block=(timeout != 0), timeout=timeout) except Queue.Empty: return None def cleanup(self, timeout=0): """Cleans up the container pool. Stops all worker threads, and destroys all Containers still in the Pool. @param timeout: For testing. If this is non-zero, it specifies the number of seconds to wait for each worker to shut down. An error is raised if shutdown has not occurred by then. If zero (the default), don't wait for worker threads to shut down, just return immediately. """ logging.info('Pool.cleanup called.') # Stop the monitor thread, then drain the pool. self._monitor.stop(timeout) try: dcount = 0 logging.debug('Emptying container pool') while True: container = self._pool.get(block=False) dcount += 1 container.destroy() except Queue.Empty: pass finally: metrics.Counter(METRICS_PREFIX + '/containers_cleaned_up' ).increment_by(dcount) logging.debug('Done. Destroyed %d containers', dcount) @property def size(self): """Returns the current size of the pool. Note that the pool is asynchronous. Returning a size greater than zero does not guarantee that a subsequent call to Pool.get will not block. Conversely, returning a size of zero does not guarantee that a subsequent call to Pool.get will block. """ return self._pool.qsize() @property def capacity(self): """Returns the max size of the pool.""" return self._pool.maxsize @property def errors(self): """Returns worker errors. @return: A Queue containing all the errors encountered on worker threads. """ return self._monitor.errors; @property def worker_count(self): """Returns the number of currently active workers. Note that this isn't quite the same as the number of currently alive worker threads. Worker threads that have timed out or been cancelled may be technically alive, but they are not included in this count. """ return len(self._monitor._workers) class _Monitor(threading.Thread): """A thread that manages the creation of containers for the pool. Container creation is potentially time-consuming and can hang or crash. The Monitor class manages a pool of independent threads, each responsible for the creation of a single Container. This provides parallelized container creation and ensures that a single Container creation hanging/crashing does not starve or crash the Pool. """ def __init__(self, factory, pool): """Creates a new monitor. @param factory: A container factory. @param pool: A pool instance to push created containers into. """ super(_Monitor, self).__init__(name='pool_monitor') self._factory = factory self._pool = pool # List of worker threads. Access this only from the monitor thread. self._worker_max = _MAX_CONCURRENT_WORKERS self._workers = [] # A flag for stopping the monitor. self._stop = False # Stores errors from worker threads. self.errors = Queue.Queue() # Throttle on errors, to avoid log spam and CPU spinning. self._error_timestamps = collections.deque() def run(self): """Supplies the container pool with containers.""" logging.debug('Start event loop.') while not self._stop: self._clear_old_errors() self._create_workers() self._poll_workers() time.sleep(_MIN_MONITOR_PERIOD) logging.debug('Exit event loop.') # Clean up - stop all workers. for worker in self._workers: worker.cancel() def stop(self, timeout=0): """Stops this thread. This function blocks until the monitor thread has stopped. @param timeout: If this is a non-zero number, wait this long (in seconds) for each worker thread to stop. If zero (the default), don't wait for worker threads to exit. @raise WorkerTimeoutError: If a worker thread does not exit within the specified timeout. """ logging.info('Stop requested.') self._stop = True self.join() logging.info('Stopped.') # Wait for workers if timeout was requested. if timeout > 0: logging.debug('Waiting for workers to terminate...') for worker in self._workers: worker.join(timeout) if worker.is_alive(): raise lxc_error.WorkerTimeoutError() def _create_workers(self): """Spawns workers to handle container requests. This method modifies the _workers list and should only be called from within run(). """ if self._pool.full(): return # Do not exceed the worker limit. if len(self._workers) >= self._worker_max: return too_many_errors = len(self._error_timestamps) >= _MAX_ERRORS_PER_HOUR metrics.Counter(METRICS_PREFIX + '/error_throttled', field_spec=[ts_mon.BooleanField('throttled')] ).increment(fields={'throttled': too_many_errors}) # Throttle if too many errors occur. if too_many_errors: logging.warning('Error throttled (until %d)', self._error_timestamps[0] + 3600) return # Create workers to refill the pool. qsize = self._pool.qsize() shortfall = self._pool.maxsize - qsize old_worker_count = len(self._workers) # Avoid spamming - only log if the monitor is taking some action. Log # this before creating worker threads, because we are counting live # threads and want to avoid race conditions w.r.t. threads actually # starting. if (old_worker_count < shortfall and old_worker_count < self._worker_max): # This can include workers that aren't currently in the self._worker # list, e.g. workers that were dropped from the list because they # timed out. active_workers = sum([1 for t in threading.enumerate() if type(t) is _Worker]) # qsize : Current size of the container pool. # shortfall: Number of empty slots currently in the pool. # workers : m+n, where m is the current number of active worker # threads and n is the number of new threads created. logging.debug('qsize:%d shortfall:%d workers:%d', qsize, shortfall, active_workers) if len(self._workers) < shortfall: worker = _Worker(self._factory, self._on_worker_result, self._on_worker_error) worker.start() self._workers.append(worker) def _poll_workers(self): """Checks worker states and deals with them. This method modifies the _workers list and should only be called from within run(). Completed workers are taken off the worker list and their results/errors are logged. """ completed = [] incomplete = [] for worker in self._workers: if worker.check_health(): incomplete.append(worker) else: completed.append(worker) self._workers = incomplete def _on_worker_result(self, result): """Receives results from completed worker threads. Pass this as the result callback for worker threads. Worker threads should call this when they produce a container. """ logging.debug('Worker result: %r', result) self._pool.put(result) def _on_worker_error(self, worker, err): """Receives errors from worker threads. Pass this as the error callback for worker threads. Worker threads should call this if errors occur. """ timestamp = time.time() self._error_timestamps.append(timestamp) self.errors.put(err) logging.error('[%d] Worker error: %s', worker.ident, err) def _clear_old_errors(self): """Clears errors more than an hour old out of the log.""" one_hour_ago = time.time() - 3600 metrics.Counter(METRICS_PREFIX + '/recent_errors' ).increment_by(len(self._error_timestamps)) while (self._error_timestamps and self._error_timestamps[0] < one_hour_ago): self._error_timestamps.popleft() # Avoid logspam - log only when some action was taken. logging.error('Worker error count: %d', len(self._error_timestamps)) class _Worker(threading.Thread): """A worker thread tasked with managing the creation of a single container. The worker is a daemon thread that calls upon a container factory to create a single container. If container creation raises any exceptions, they are logged and the worker exits. The worker also provides a mechanism for the parent thread to impose timeouts on container creation. """ def __init__(self, factory, result_cb, error_cb): """Creates a new Worker. @param factory: A factory object that will be called upon to create Containers. """ super(_Worker, self).__init__(name='pool_worker') # Hanging worker threads should not keep the pool process alive. self.daemon = True self._factory = factory self._result_cb = result_cb self._error_cb = error_cb self._cancelled = False self._start_time = None # A lock for breaking race conditions in worker cancellation. Use a # recursive lock because _check_health requires it. self._completion_lock = threading.RLock() self._completed = False def run(self): """Creates a single container.""" self._start_time = time.time() container = None try: container = self._factory.create_container() container.start(wait_for_network=True) except Exception as e: logging.error('Worker error: %s', error.format_error()) self._error_cb(self, e) finally: # All this has to happen atomically, otherwise race conditions can # arise w.r.t. cancellation. with self._completion_lock: self._completed = True if self._cancelled: # If the job was cancelled, destroy the container instead of # putting it in the result queue. Do not release the # throttle, as it would have been released when the # cancellation occurred. if container is not None: container.destroy() else: # Put the container in the result field. Release the # throttle so another task can be picked up. # Container may be None if errors occurred. if container is not None: self._result_cb(container) def cancel(self): """Cancels the work request. The container will be destroyed when created, instead of being added to the container pool. """ with self._completion_lock: if self._completed: return False else: self._cancelled = True return True def check_health(self): """Checks that a worker is alive and has not timed out. Checks the run time of the worker to make sure it hasn't timed out. Cancels workers that exceed the timeout. @return: True if the worker is alive and has not timed out, False otherwise. """ # Acquire the completion lock so as to avoid race conditions if the # factory happens to return just as we are timing out. with self._completion_lock: if not self.is_alive() or self._cancelled or self._completed: return False # Thread hasn't started yet - count this as healthy. if self._start_time is None: return True # If alive, check the timeout and cancel if necessary. runtime = time.time() - self._start_time if runtime > _CONTAINER_CREATION_TIMEOUT: if self.cancel(): self._error_cb(self, lxc_error.WorkerTimeoutError()) return False return True