# pylint: disable-msg=C0111
# Copyright 2008 Google Inc. Released under the GPL v2
import warnings
with warnings.catch_warnings():
# The 'compiler' module is gone in Python 3.0. Let's not say
# so in every log file.
warnings.simplefilter("ignore", DeprecationWarning)
import compiler
import logging, textwrap
from autotest_lib.client.common_lib import enum
REQUIRED_VARS = set(['author', 'doc', 'name', 'time', 'test_type'])
OBSOLETE_VARS = set(['experimental'])
CONTROL_TYPE = enum.Enum('Server', 'Client', start_value=1)
CONTROL_TYPE_NAMES = enum.Enum(*CONTROL_TYPE.names, string_values=True)
class ControlVariableException(Exception):
pass
class ControlData(object):
# Available TIME settings in control file, the list must be in lower case
# and in ascending order, test running faster comes first.
TEST_TIME_LIST = ['fast', 'short', 'medium', 'long', 'lengthy']
TEST_TIME = enum.Enum(*TEST_TIME_LIST, string_values=False)
@staticmethod
def get_test_time_index(time):
"""
Get the order of estimated test time, based on the TIME setting in
Control file. Faster test gets a lower index number.
"""
try:
return ControlData.TEST_TIME.get_value(time.lower())
except AttributeError:
# Raise exception if time value is not a valid TIME setting.
error_msg = '%s is not a valid TIME.' % time
logging.error(error_msg)
raise ControlVariableException(error_msg)
def __init__(self, vars, path, raise_warnings=False):
# Defaults
self.path = path
self.dependencies = set()
# TODO(jrbarnette): This should be removed once outside
# code that uses can be changed.
self.experimental = False
self.run_verify = True
self.sync_count = 1
self.test_parameters = set()
self.test_category = ''
self.test_class = ''
self.retries = 0
self.job_retries = 0
# Default to require server-side package. Unless require_ssp is
# explicitly set to False, server-side package will be used for the
# job. This can be overridden by global config
# AUTOSERV/enable_ssp_container
self.require_ssp = None
self.attributes = set()
diff = REQUIRED_VARS - set(vars)
if diff:
warning = ('WARNING: Not all required control '
'variables were specified in %s. Please define '
'%s.') % (self.path, ', '.join(diff))
if raise_warnings:
raise ControlVariableException(warning)
print textwrap.wrap(warning, 80)
obsolete = OBSOLETE_VARS & set(vars)
if obsolete:
warning = ('WARNING: Obsolete variables were '
'specified in %s. Please remove '
'%s.') % (self.path, ', '.join(obsolete))
if raise_warnings:
raise ControlVariableException(warning)
print textwrap.wrap(warning, 80)
for key, val in vars.iteritems():
try:
self.set_attr(key, val, raise_warnings)
except Exception, e:
if raise_warnings:
raise
print 'WARNING: %s; skipping' % e
def set_attr(self, attr, val, raise_warnings=False):
attr = attr.lower()
try:
set_fn = getattr(self, 'set_%s' % attr)
set_fn(val)
except AttributeError:
# This must not be a variable we care about
pass
def _set_string(self, attr, val):
val = str(val)
setattr(self, attr, val)
def _set_option(self, attr, val, options):
val = str(val)
if val.lower() not in [x.lower() for x in options]:
raise ValueError("%s must be one of the following "
"options: %s" % (attr,
', '.join(options)))
setattr(self, attr, val)
def _set_bool(self, attr, val):
val = str(val).lower()
if val == "false":
val = False
elif val == "true":
val = True
else:
msg = "%s must be either true or false" % attr
raise ValueError(msg)
setattr(self, attr, val)
def _set_int(self, attr, val, min=None, max=None):
val = int(val)
if min is not None and min > val:
raise ValueError("%s is %d, which is below the "
"minimum of %d" % (attr, val, min))
if max is not None and max < val:
raise ValueError("%s is %d, which is above the "
"maximum of %d" % (attr, val, max))
setattr(self, attr, val)
def _set_set(self, attr, val):
val = str(val)
items = [x.strip() for x in val.split(',')]
setattr(self, attr, set(items))
def set_author(self, val):
self._set_string('author', val)
def set_dependencies(self, val):
self._set_set('dependencies', val)
def set_doc(self, val):
self._set_string('doc', val)
def set_name(self, val):
self._set_string('name', val)
def set_run_verify(self, val):
self._set_bool('run_verify', val)
def set_sync_count(self, val):
self._set_int('sync_count', val, min=1)
def set_suite(self, val):
self._set_string('suite', val)
def set_time(self, val):
self._set_option('time', val, ControlData.TEST_TIME_LIST)
def set_test_class(self, val):
self._set_string('test_class', val.lower())
def set_test_category(self, val):
self._set_string('test_category', val.lower())
def set_test_type(self, val):
self._set_option('test_type', val, list(CONTROL_TYPE.names))
def set_test_parameters(self, val):
self._set_set('test_parameters', val)
def set_retries(self, val):
self._set_int('retries', val)
def set_job_retries(self, val):
self._set_int('job_retries', val)
def set_bug_template(self, val):
if type(val) == dict:
setattr(self, 'bug_template', val)
def set_require_ssp(self, val):
self._set_bool('require_ssp', val)
def set_attributes(self, val):
# Add subsystem:default if subsystem is not specified.
self._set_set('attributes', val)
if not any(a.startswith('subsystem') for a in self.attributes):
self.attributes.add('subsystem:default')
def _extract_const(expr):
assert(expr.__class__ == compiler.ast.Const)
assert(expr.value.__class__ in (str, int, float, unicode))
return str(expr.value).strip()
def _extract_dict(expr):
assert(expr.__class__ == compiler.ast.Dict)
assert(expr.items.__class__ == list)
cf_dict = {}
for key, value in expr.items:
try:
key = _extract_const(key)
val = _extract_expression(value)
except (AssertionError, ValueError):
pass
else:
cf_dict[key] = val
return cf_dict
def _extract_list(expr):
assert(expr.__class__ == compiler.ast.List)
list_values = []
for value in expr.nodes:
try:
list_values.append(_extract_expression(value))
except (AssertionError, ValueError):
pass
return list_values
def _extract_name(expr):
assert(expr.__class__ == compiler.ast.Name)
assert(expr.name in ('False', 'True', 'None'))
return str(expr.name)
def _extract_expression(expr):
if expr.__class__ == compiler.ast.Const:
return _extract_const(expr)
if expr.__class__ == compiler.ast.Name:
return _extract_name(expr)
if expr.__class__ == compiler.ast.Dict:
return _extract_dict(expr)
if expr.__class__ == compiler.ast.List:
return _extract_list(expr)
raise ValueError('Unknown rval %s' % expr)
def _extract_assignment(n):
assert(n.__class__ == compiler.ast.Assign)
assert(n.nodes.__class__ == list)
assert(len(n.nodes) == 1)
assert(n.nodes[0].__class__ == compiler.ast.AssName)
assert(n.nodes[0].flags.__class__ == str)
assert(n.nodes[0].name.__class__ == str)
val = _extract_expression(n.expr)
key = n.nodes[0].name.lower()
return (key, val)
def parse_control_string(control, raise_warnings=False):
try:
mod = compiler.parse(control)
except SyntaxError, e:
raise ControlVariableException("Error parsing data because %s" % e)
return finish_parse(mod, '', raise_warnings)
def parse_control(path, raise_warnings=False):
try:
mod = compiler.parseFile(path)
except SyntaxError, e:
raise ControlVariableException("Error parsing %s because %s" %
(path, e))
return finish_parse(mod, path, raise_warnings)
def finish_parse(mod, path, raise_warnings):
assert(mod.__class__ == compiler.ast.Module)
assert(mod.node.__class__ == compiler.ast.Stmt)
assert(mod.node.nodes.__class__ == list)
vars = {}
for n in mod.node.nodes:
try:
key, val = _extract_assignment(n)
vars[key] = val
except (AssertionError, ValueError):
pass
return ControlData(vars, path, raise_warnings)