普通文本  |  417行  |  15.75 KB

"""Autotest AFE Cleanup used by the scheduler"""

import contextlib
import logging
import random
import time

from autotest_lib.client.common_lib import utils
from autotest_lib.frontend.afe import models
from autotest_lib.scheduler import scheduler_config
from autotest_lib.client.common_lib import global_config
from autotest_lib.client.common_lib import host_protections

try:
    from chromite.lib import metrics
except ImportError:
    metrics = utils.metrics_mock


_METRICS_PREFIX = 'chromeos/autotest/scheduler/cleanup'


class PeriodicCleanup(object):
    """Base class to schedule periodical cleanup work.
    """

    def __init__(self, db, clean_interval_minutes, run_at_initialize=False):
        self._db = db
        self.clean_interval_minutes = clean_interval_minutes
        self._last_clean_time = time.time()
        self._run_at_initialize = run_at_initialize


    def initialize(self):
        """Method called by scheduler at the startup.
        """
        if self._run_at_initialize:
            self._cleanup()


    def run_cleanup_maybe(self):
        """Test if cleanup method should be called.
        """
        should_cleanup = (self._last_clean_time +
                          self.clean_interval_minutes * 60
                          < time.time())
        if should_cleanup:
            self._cleanup()
            self._last_clean_time = time.time()


    def _cleanup(self):
        """Abrstract cleanup method."""
        raise NotImplementedError


class UserCleanup(PeriodicCleanup):
    """User cleanup that is controlled by the global config variable
       clean_interval_minutes in the SCHEDULER section.
    """

    def __init__(self, db, clean_interval_minutes):
        super(UserCleanup, self).__init__(db, clean_interval_minutes)
        self._last_reverify_time = time.time()


    @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/user/durations')
    def _cleanup(self):
        logging.info('Running periodic cleanup')
        self._abort_timed_out_jobs()
        self._abort_jobs_past_max_runtime()
        self._clear_inactive_blocks()
        self._check_for_db_inconsistencies()
        self._reverify_dead_hosts()
        self._django_session_cleanup()


    def _abort_timed_out_jobs(self):
        logging.info(
                'Aborting all jobs that have timed out and are not complete')
        query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
            where=['created_on + INTERVAL timeout_mins MINUTE < NOW()'])
        jobs = query.distinct()
        if not jobs:
            return

        with _cleanup_warning_banner('timed out jobs', len(jobs)):
            for job in jobs:
                logging.warning('Aborting job %d due to job timeout', job.id)
                job.abort()
        _report_detected_errors('jobs_timed_out', len(jobs))


    def _abort_jobs_past_max_runtime(self):
        """
        Abort executions that have started and are past the job's max runtime.
        """
        logging.info('Aborting all jobs that have passed maximum runtime')
        rows = self._db.execute("""
            SELECT hqe.id FROM afe_host_queue_entries AS hqe
            WHERE NOT hqe.complete AND NOT hqe.aborted AND EXISTS
            (select * from afe_jobs where hqe.job_id=afe_jobs.id and
             hqe.started_on + INTERVAL afe_jobs.max_runtime_mins MINUTE < NOW())
            """)
        query = models.HostQueueEntry.objects.filter(
            id__in=[row[0] for row in rows])
        hqes = query.distinct()
        if not hqes:
            return

        with _cleanup_warning_banner('hqes past max runtime', len(hqes)):
            for queue_entry in hqes:
                logging.warning('Aborting entry %s due to max runtime',
                                queue_entry)
                queue_entry.abort()
        _report_detected_errors('hqes_past_max_runtime', len(hqes))


    def _check_for_db_inconsistencies(self):
        logging.info('Cleaning db inconsistencies')
        self._check_all_invalid_related_objects()


    def _check_invalid_related_objects_one_way(self, invalid_model,
                                               relation_field, valid_model):
        if 'invalid' not in invalid_model.get_field_dict():
            return

        invalid_objects = list(invalid_model.objects.filter(invalid=True))
        invalid_model.objects.populate_relationships(
                invalid_objects, valid_model, 'related_objects')
        if not invalid_objects:
            return

        num_objects_with_invalid_relations = 0
        errors = []
        for invalid_object in invalid_objects:
            if invalid_object.related_objects:
                related_objects = invalid_object.related_objects
                related_list = ', '.join(str(x) for x in related_objects)
                num_objects_with_invalid_relations += 1
                errors.append('Invalid %s is related to: %s' %
                              (invalid_object, related_list))
                related_manager = getattr(invalid_object, relation_field)
                related_manager.clear()

        # Only log warnings after we're sure we've seen at least one invalid
        # model with some valid relations to avoid empty banners from getting
        # printed.
        if errors:
            invalid_model_name = invalid_model.__name__
            valid_model_name = valid_model.__name__
            banner = 'invalid %s related to valid %s' % (invalid_model_name,
                                                         valid_model_name)
            with _cleanup_warning_banner(banner, len(errors)):
                for error in errors:
                    logging.warning(error)
            _report_detected_errors(
                    'invalid_related_objects',
                    num_objects_with_invalid_relations,
                    fields={'invalid_model': invalid_model_name,
                            'valid_model': valid_model_name})
            _report_detected_errors(
                    'invalid_related_objects_relations',
                    len(errors),
                    fields={'invalid_model': invalid_model_name,
                            'valid_model': valid_model_name})


    def _check_invalid_related_objects(self, first_model, first_field,
                                       second_model, second_field):
        self._check_invalid_related_objects_one_way(
                first_model,
                first_field,
                second_model,
        )
        self._check_invalid_related_objects_one_way(
                second_model,
                second_field,
                first_model,
        )


    def _check_all_invalid_related_objects(self):
        model_pairs = ((models.Host, 'labels', models.Label, 'host_set'),
                       (models.AclGroup, 'hosts', models.Host, 'aclgroup_set'),
                       (models.AclGroup, 'users', models.User, 'aclgroup_set'),
                       (models.Test, 'dependency_labels', models.Label,
                        'test_set'))
        for first_model, first_field, second_model, second_field in model_pairs:
            self._check_invalid_related_objects(
                    first_model,
                    first_field,
                    second_model,
                    second_field,
            )


    def _clear_inactive_blocks(self):
        logging.info('Clear out blocks for all completed jobs.')
        # this would be simpler using NOT IN (subquery), but MySQL
        # treats all IN subqueries as dependent, so this optimizes much
        # better
        self._db.execute("""
                DELETE ihq FROM afe_ineligible_host_queues ihq
                WHERE NOT EXISTS
                    (SELECT job_id FROM afe_host_queue_entries hqe
                     WHERE NOT hqe.complete AND hqe.job_id = ihq.job_id)""")


    def _should_reverify_hosts_now(self):
        reverify_period_sec = (scheduler_config.config.reverify_period_minutes
                               * 60)
        if reverify_period_sec == 0:
            return False
        return (self._last_reverify_time + reverify_period_sec) <= time.time()


    def _choose_subset_of_hosts_to_reverify(self, hosts):
        """Given hosts needing verification, return a subset to reverify."""
        max_at_once = scheduler_config.config.reverify_max_hosts_at_once
        if (max_at_once > 0 and len(hosts) > max_at_once):
            return random.sample(hosts, max_at_once)
        return sorted(hosts)


    def _reverify_dead_hosts(self):
        if not self._should_reverify_hosts_now():
            return

        self._last_reverify_time = time.time()
        logging.info('Checking for dead hosts to reverify')
        hosts = models.Host.objects.filter(
                status=models.Host.Status.REPAIR_FAILED,
                locked=False,
                invalid=False)
        hosts = hosts.exclude(
                protection=host_protections.Protection.DO_NOT_VERIFY)
        if not hosts:
            return

        hosts = list(hosts)
        total_hosts = len(hosts)
        hosts = self._choose_subset_of_hosts_to_reverify(hosts)
        logging.info('Reverifying dead hosts (%d of %d)', len(hosts),
                     total_hosts)
        with _cleanup_warning_banner('reverify dead hosts', len(hosts)):
            for host in hosts:
                logging.warning(host.hostname)
        _report_detected_errors('dead_hosts_triggered_reverify', len(hosts))
        _report_detected_errors('dead_hosts_require_reverify', total_hosts)
        for host in hosts:
            models.SpecialTask.schedule_special_task(
                    host=host, task=models.SpecialTask.Task.VERIFY)


    def _django_session_cleanup(self):
        """Clean up django_session since django doesn't for us.
           http://www.djangoproject.com/documentation/0.96/sessions/
        """
        logging.info('Deleting old sessions from django_session')
        sql = 'TRUNCATE TABLE django_session'
        self._db.execute(sql)


class TwentyFourHourUpkeep(PeriodicCleanup):
    """Cleanup that runs at the startup of monitor_db and every subsequent
       twenty four hours.
    """


    def __init__(self, db, drone_manager, run_at_initialize=True):
        """Initialize TwentyFourHourUpkeep.

        @param db: Database connection object.
        @param drone_manager: DroneManager to access drones.
        @param run_at_initialize: True to run cleanup when scheduler starts.
                                  Default is set to True.

        """
        self.drone_manager = drone_manager
        clean_interval_minutes = 24 * 60 # 24 hours
        super(TwentyFourHourUpkeep, self).__init__(
            db, clean_interval_minutes, run_at_initialize=run_at_initialize)


    @metrics.SecondsTimerDecorator(_METRICS_PREFIX + '/daily/durations')
    def _cleanup(self):
        logging.info('Running 24 hour clean up')
        self._check_for_uncleanable_db_inconsistencies()
        self._cleanup_orphaned_containers()


    def _check_for_uncleanable_db_inconsistencies(self):
        logging.info('Checking for uncleanable DB inconsistencies')
        self._check_for_active_and_complete_queue_entries()
        self._check_for_multiple_platform_hosts()
        self._check_for_no_platform_hosts()


    def _check_for_active_and_complete_queue_entries(self):
        query = models.HostQueueEntry.objects.filter(active=True, complete=True)
        num_bad_hqes = query.count()
        if num_bad_hqes == 0:
            return

        num_aborted = 0
        logging.warning('%d queue entries found with active=complete=1',
                        num_bad_hqes)
        with _cleanup_warning_banner('active and complete hqes', num_bad_hqes):
            for entry in query:
                if entry.status == 'Aborted':
                    entry.active = False
                    entry.save()
                    recovery_path = 'was also aborted, set active to False'
                    num_aborted += 1
                else:
                    recovery_path = 'can not recover'
                logging.warning('%s (recovery: %s)', entry.get_object_dict(),
                                recovery_path)
        _report_detected_errors('hqes_active_and_complete', num_bad_hqes)
        _report_detected_errors('hqes_aborted_set_to_inactive', num_aborted)


    def _check_for_multiple_platform_hosts(self):
        rows = self._db.execute("""
            SELECT afe_hosts.id, hostname, COUNT(1) AS platform_count,
                   GROUP_CONCAT(afe_labels.name)
            FROM afe_hosts
            INNER JOIN afe_hosts_labels ON
                    afe_hosts.id = afe_hosts_labels.host_id
            INNER JOIN afe_labels ON afe_hosts_labels.label_id = afe_labels.id
            WHERE afe_labels.platform
            GROUP BY afe_hosts.id
            HAVING platform_count > 1
            ORDER BY hostname""")

        if rows:
            logging.warning('Cleanup found hosts with multiple platforms')
            with _cleanup_warning_banner('hosts with multiple platforms',
                                         len(rows)):
                for row in rows:
                    logging.warning(' '.join(str(item) for item in row))
            _report_detected_errors('hosts_with_multiple_platforms', len(rows))


    def _check_for_no_platform_hosts(self):
        rows = self._db.execute("""
            SELECT hostname
            FROM afe_hosts
            LEFT JOIN afe_hosts_labels
              ON afe_hosts.id = afe_hosts_labels.host_id
              AND afe_hosts_labels.label_id IN (SELECT id FROM afe_labels
                                                WHERE platform)
            WHERE NOT afe_hosts.invalid AND afe_hosts_labels.host_id IS NULL""")
        if rows:
            with _cleanup_warning_banner('hosts with no platform', len(rows)):
                for row in rows:
                    logging.warning(row[0])
            _report_detected_errors('hosts_with_no_platform', len(rows))


    def _cleanup_orphaned_containers(self):
        """Cleanup orphaned containers in each drone.

        The function queues a lxc_cleanup call in each drone without waiting for
        the script to finish, as the cleanup procedure could take minutes and the
        script output is logged.

        """
        ssp_enabled = global_config.global_config.get_config_value(
                'AUTOSERV', 'enable_ssp_container')
        if not ssp_enabled:
            logging.info(
                    'Server-side packaging is not enabled, no need to clean '
                    'up orphaned containers.')
            return
        self.drone_manager.cleanup_orphaned_containers()


def _report_detected_errors(metric_name, count, fields={}):
    """Reports a counter metric for recovered errors

    @param metric_name: Name of the metric to report about.
    @param count: How many "errors" were fixed this cycle.
    @param fields: Optional fields to include with the metric.
    """
    m = '%s/errors_recovered/%s' % (_METRICS_PREFIX, metric_name)
    metrics.Counter(m).increment_by(count, fields=fields)


def _report_detected_errors(metric_name, gauge, fields={}):
    """Reports a gauge metric for errors detected

    @param metric_name: Name of the metric to report about.
    @param gauge: Outstanding number of unrecoverable errors of this type.
    @param fields: Optional fields to include with the metric.
    """
    m = '%s/errors_detected/%s' % (_METRICS_PREFIX, metric_name)
    metrics.Gauge(m).set(gauge, fields=fields)


@contextlib.contextmanager
def _cleanup_warning_banner(banner, error_count=None):
    """Put a clear context in the logs around list of errors

    @param: banner: The identifying header to print for context.
    @param: error_count: If not None, the number of errors detected.
    """
    if error_count is not None:
        banner += ' (total: %d)' % error_count
    logging.warning('#### START: %s ####', banner)
    try:
        yield
    finally:
        logging.warning('#### END: %s ####', banner)