普通文本  |  312行  |  9.13 KB

# pylint: disable-msg=C0111
# Copyright 2008 Google Inc. Released under the GPL v2

import warnings
with warnings.catch_warnings():
    # The 'compiler' module is gone in Python 3.0.  Let's not say
    # so in every log file.
    warnings.simplefilter("ignore", DeprecationWarning)
    import compiler
import logging, textwrap

from autotest_lib.client.common_lib import enum

REQUIRED_VARS = set(['author', 'doc', 'name', 'time', 'test_type'])
OBSOLETE_VARS = set(['experimental'])

CONTROL_TYPE = enum.Enum('Server', 'Client', start_value=1)
CONTROL_TYPE_NAMES =  enum.Enum(*CONTROL_TYPE.names, string_values=True)

class ControlVariableException(Exception):
    pass


class ControlData(object):
    # Available TIME settings in control file, the list must be in lower case
    # and in ascending order, test running faster comes first.
    TEST_TIME_LIST = ['fast', 'short', 'medium', 'long', 'lengthy']
    TEST_TIME = enum.Enum(*TEST_TIME_LIST, string_values=False)

    @staticmethod
    def get_test_time_index(time):
        """
        Get the order of estimated test time, based on the TIME setting in
        Control file. Faster test gets a lower index number.
        """
        try:
            return ControlData.TEST_TIME.get_value(time.lower())
        except AttributeError:
            # Raise exception if time value is not a valid TIME setting.
            error_msg = '%s is not a valid TIME.' % time
            logging.error(error_msg)
            raise ControlVariableException(error_msg)


    def __init__(self, vars, path, raise_warnings=False):
        # Defaults
        self.path = path
        self.dependencies = set()
        # TODO(jrbarnette): This should be removed once outside
        # code that uses can be changed.
        self.experimental = False
        self.run_verify = True
        self.sync_count = 1
        self.test_parameters = set()
        self.test_category = ''
        self.test_class = ''
        self.retries = 0
        self.job_retries = 0
        # Default to require server-side package. Unless require_ssp is
        # explicitly set to False, server-side package will be used for the
        # job. This can be overridden by global config
        # AUTOSERV/enable_ssp_container
        self.require_ssp = None
        self.attributes = set()

        diff = REQUIRED_VARS - set(vars)
        if diff:
            warning = ('WARNING: Not all required control '
                       'variables were specified in %s.  Please define '
                       '%s.') % (self.path, ', '.join(diff))
            if raise_warnings:
                raise ControlVariableException(warning)
            print textwrap.wrap(warning, 80)

        obsolete = OBSOLETE_VARS & set(vars)
        if obsolete:
            warning = ('WARNING: Obsolete variables were '
                       'specified in %s.  Please remove '
                       '%s.') % (self.path, ', '.join(obsolete))
            if raise_warnings:
                raise ControlVariableException(warning)
            print textwrap.wrap(warning, 80)

        for key, val in vars.iteritems():
            try:
                self.set_attr(key, val, raise_warnings)
            except Exception, e:
                if raise_warnings:
                    raise
                print 'WARNING: %s; skipping' % e


    def set_attr(self, attr, val, raise_warnings=False):
        attr = attr.lower()
        try:
            set_fn = getattr(self, 'set_%s' % attr)
            set_fn(val)
        except AttributeError:
            # This must not be a variable we care about
            pass


    def _set_string(self, attr, val):
        val = str(val)
        setattr(self, attr, val)


    def _set_option(self, attr, val, options):
        val = str(val)
        if val.lower() not in [x.lower() for x in options]:
            raise ValueError("%s must be one of the following "
                             "options: %s" % (attr,
                             ', '.join(options)))
        setattr(self, attr, val)


    def _set_bool(self, attr, val):
        val = str(val).lower()
        if val == "false":
            val = False
        elif val == "true":
            val = True
        else:
            msg = "%s must be either true or false" % attr
            raise ValueError(msg)
        setattr(self, attr, val)


    def _set_int(self, attr, val, min=None, max=None):
        val = int(val)
        if min is not None and min > val:
            raise ValueError("%s is %d, which is below the "
                             "minimum of %d" % (attr, val, min))
        if max is not None and max < val:
            raise ValueError("%s is %d, which is above the "
                             "maximum of %d" % (attr, val, max))
        setattr(self, attr, val)


    def _set_set(self, attr, val):
        val = str(val)
        items = [x.strip() for x in val.split(',')]
        setattr(self, attr, set(items))


    def set_author(self, val):
        self._set_string('author', val)


    def set_dependencies(self, val):
        self._set_set('dependencies', val)


    def set_doc(self, val):
        self._set_string('doc', val)


    def set_name(self, val):
        self._set_string('name', val)


    def set_run_verify(self, val):
        self._set_bool('run_verify', val)


    def set_sync_count(self, val):
        self._set_int('sync_count', val, min=1)


    def set_suite(self, val):
        self._set_string('suite', val)


    def set_time(self, val):
        self._set_option('time', val, ControlData.TEST_TIME_LIST)


    def set_test_class(self, val):
        self._set_string('test_class', val.lower())


    def set_test_category(self, val):
        self._set_string('test_category', val.lower())


    def set_test_type(self, val):
        self._set_option('test_type', val, list(CONTROL_TYPE.names))


    def set_test_parameters(self, val):
        self._set_set('test_parameters', val)


    def set_retries(self, val):
        self._set_int('retries', val)


    def set_job_retries(self, val):
        self._set_int('job_retries', val)


    def set_bug_template(self, val):
        if type(val) == dict:
            setattr(self, 'bug_template', val)


    def set_require_ssp(self, val):
        self._set_bool('require_ssp', val)


    def set_attributes(self, val):
        # Add subsystem:default if subsystem is not specified.
        self._set_set('attributes', val)
        if not any(a.startswith('subsystem') for a in self.attributes):
            self.attributes.add('subsystem:default')


def _extract_const(expr):
    assert(expr.__class__ == compiler.ast.Const)
    assert(expr.value.__class__ in (str, int, float, unicode))
    return str(expr.value).strip()


def _extract_dict(expr):
    assert(expr.__class__ == compiler.ast.Dict)
    assert(expr.items.__class__ == list)
    cf_dict = {}
    for key, value in expr.items:
        try:
            key = _extract_const(key)
            val = _extract_expression(value)
        except (AssertionError, ValueError):
            pass
        else:
            cf_dict[key] = val
    return cf_dict


def _extract_list(expr):
    assert(expr.__class__ == compiler.ast.List)
    list_values = []
    for value in expr.nodes:
        try:
            list_values.append(_extract_expression(value))
        except (AssertionError, ValueError):
            pass
    return list_values


def _extract_name(expr):
    assert(expr.__class__ == compiler.ast.Name)
    assert(expr.name in ('False', 'True', 'None'))
    return str(expr.name)


def _extract_expression(expr):
    if expr.__class__ == compiler.ast.Const:
        return _extract_const(expr)
    if expr.__class__ == compiler.ast.Name:
        return _extract_name(expr)
    if expr.__class__ == compiler.ast.Dict:
        return _extract_dict(expr)
    if expr.__class__ == compiler.ast.List:
        return _extract_list(expr)
    raise ValueError('Unknown rval %s' % expr)


def _extract_assignment(n):
    assert(n.__class__ == compiler.ast.Assign)
    assert(n.nodes.__class__ == list)
    assert(len(n.nodes) == 1)
    assert(n.nodes[0].__class__ == compiler.ast.AssName)
    assert(n.nodes[0].flags.__class__ == str)
    assert(n.nodes[0].name.__class__ == str)

    val = _extract_expression(n.expr)
    key = n.nodes[0].name.lower()

    return (key, val)


def parse_control_string(control, raise_warnings=False):
    try:
        mod = compiler.parse(control)
    except SyntaxError, e:
        raise ControlVariableException("Error parsing data because %s" % e)
    return finish_parse(mod, '', raise_warnings)


def parse_control(path, raise_warnings=False):
    try:
        mod = compiler.parseFile(path)
    except SyntaxError, e:
        raise ControlVariableException("Error parsing %s because %s" %
                                       (path, e))
    return finish_parse(mod, path, raise_warnings)


def finish_parse(mod, path, raise_warnings):
    assert(mod.__class__ == compiler.ast.Module)
    assert(mod.node.__class__ == compiler.ast.Stmt)
    assert(mod.node.nodes.__class__ == list)

    vars = {}
    for n in mod.node.nodes:
        try:
            key, val = _extract_assignment(n)
            vars[key] = val
        except (AssertionError, ValueError):
            pass
    return ControlData(vars, path, raise_warnings)