普通文本  |  429行  |  17.76 KB

#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from future import standard_library
standard_library.install_aliases()

import copy
import importlib
import inspect
import logging
import os
import pkgutil
import signal
import sys

from vts.runners.host import base_test
from vts.runners.host import config_parser
from vts.runners.host import keys
from vts.runners.host import logger
from vts.runners.host import records
from vts.runners.host import signals
from vts.runners.host import utils


def main():
    """Execute the test class in a test module.

    This is to be used in a test script's main so the script can be executed
    directly. It will discover all the classes that inherit from BaseTestClass
    and excute them. all the test results will be aggregated into one.

    A VTS host-driven test case has three args:
       1st arg: the path of a test case config file.
       2nd arg: the serial ID of a target device (device config).
       3rd arg: the path of a test case data dir.

    Returns:
        The TestResult object that holds the results of the test run.
    """
    test_classes = []
    main_module_members = sys.modules["__main__"]
    for _, module_member in main_module_members.__dict__.items():
        if inspect.isclass(module_member):
            if issubclass(module_member, base_test.BaseTestClass):
                test_classes.append(module_member)
    # TODO(angli): Need to handle the case where more than one test class is in
    # a test script. The challenge is to handle multiple configs and how to do
    # default config in this case.
    if len(test_classes) != 1:
        logging.error("Expected 1 test class per file, found %s.",
                      len(test_classes))
        sys.exit(1)
    test_result = runTestClass(test_classes[0])
    return test_result


def runTestClass(test_class):
    """Execute one test class.

    This will create a TestRunner, execute one test run with one test class.

    Args:
        test_class: The test class to instantiate and execute.

    Returns:
        The TestResult object that holds the results of the test run.
    """
    test_cls_name = test_class.__name__
    if len(sys.argv) < 2:
        logging.warning("Missing a configuration file. Using the default.")
        test_configs = [config_parser.GetDefaultConfig(test_cls_name)]
    else:
        try:
            config_path = sys.argv[1]
            baseline_config = config_parser.GetDefaultConfig(test_cls_name)
            baseline_config[keys.ConfigKeys.KEY_TESTBED] = [
                baseline_config[keys.ConfigKeys.KEY_TESTBED]
            ]
            test_configs = config_parser.load_test_config_file(
                config_path, baseline_config=baseline_config)
        except IndexError:
            logging.error("No valid config file found.")
            sys.exit(1)

    test_identifiers = [(test_cls_name, None)]

    for config in test_configs:
        tr = TestRunner(config, test_identifiers)
        tr.parseTestConfig(config)
        try:
            # Create console signal handler to make sure TestRunner is stopped
            # in the event of termination.
            handler = config_parser.gen_term_signal_handler([tr])
            signal.signal(signal.SIGTERM, handler)
            signal.signal(signal.SIGINT, handler)
            tr.runTestClass(test_class, None)
        finally:
            tr.stop()
            return tr.results


class TestRunner(object):
    """The class that instantiates test classes, executes test cases, and
    report results.

    Attributes:
        self.test_run_info: A dictionary containing the information needed by
                            test classes for this test run, including params,
                            controllers, and other objects. All of these will
                            be passed to test classes.
        self.test_configs: A dictionary that is the original test configuration
                           passed in by user.
        self.id: A string that is the unique identifier of this test run.
        self.log_path: A string representing the path of the dir under which
                       all logs from this test run should be written.
        self.controller_registry: A dictionary that holds the controller
                                  objects used in a test run.
        self.controller_destructors: A dictionary that holds the controller
                                     distructors. Keys are controllers' names.
        self.run_list: A list of tuples specifying what tests to run.
        self.results: The test result object used to record the results of
                      this test run.
        self.running: A boolean signifies whether this test run is ongoing or
                      not.
    """

    def __init__(self, test_configs, run_list):
        self.test_run_info = {}
        self.test_run_info[keys.ConfigKeys.IKEY_DATA_FILE_PATH] = getattr(
            test_configs, keys.ConfigKeys.IKEY_DATA_FILE_PATH, "./")
        self.test_configs = test_configs
        self.testbed_configs = self.test_configs[keys.ConfigKeys.KEY_TESTBED]
        self.testbed_name = self.testbed_configs[
            keys.ConfigKeys.KEY_TESTBED_NAME]
        start_time = logger.getLogFileTimestamp()
        self.id = "{}@{}".format(self.testbed_name, start_time)
        # log_path should be set before parsing configs.
        l_path = os.path.join(self.test_configs[keys.ConfigKeys.KEY_LOG_PATH],
                              self.testbed_name, start_time)
        self.log_path = os.path.abspath(l_path)
        logger.setupTestLogger(self.log_path, self.testbed_name)
        self.controller_registry = {}
        self.controller_destructors = {}
        self.run_list = run_list
        self.results = records.TestResult()
        self.running = False

    def __enter__(self):
        return self

    def __exit__(self, *args):
        self.stop()

    def importTestModules(self, test_paths):
        """Imports test classes from test scripts.

        1. Locate all .py files under test paths.
        2. Import the .py files as modules.
        3. Find the module members that are test classes.
        4. Categorize the test classes by name.

        Args:
            test_paths: A list of directory paths where the test files reside.

        Returns:
            A dictionary where keys are test class name strings, values are
            actual test classes that can be instantiated.
        """

        def is_testfile_name(name, ext):
            if ext == ".py":
                if name.endswith("Test") or name.endswith("_test"):
                    return True
            return False

        file_list = utils.find_files(test_paths, is_testfile_name)
        test_classes = {}
        for path, name, _ in file_list:
            sys.path.append(path)
            try:
                module = importlib.import_module(name)
            except:
                for test_cls_name, _ in self.run_list:
                    alt_name = name.replace('_', '').lower()
                    alt_cls_name = test_cls_name.lower()
                    # Only block if a test class on the run list causes an
                    # import error. We need to check against both naming
                    # conventions: AaaBbb and aaa_bbb.
                    if name == test_cls_name or alt_name == alt_cls_name:
                        msg = ("Encountered error importing test class %s, "
                               "abort.") % test_cls_name
                        # This exception is logged here to help with debugging
                        # under py2, because "raise X from Y" syntax is only
                        # supported under py3.
                        logging.exception(msg)
                        raise USERError(msg)
                continue
            for member_name in dir(module):
                if not member_name.startswith("__"):
                    if member_name.endswith("Test"):
                        test_class = getattr(module, member_name)
                        if inspect.isclass(test_class):
                            test_classes[member_name] = test_class
        return test_classes

    def verifyControllerModule(self, module):
        """Verifies a module object follows the required interface for
        controllers.

        Args:
            module: An object that is a controller module. This is usually
                    imported with import statements or loaded by importlib.

        Raises:
            ControllerError is raised if the module does not match the vts.runners.host
            controller interface, or one of the required members is null.
        """
        required_attributes = ("create", "destroy",
                               "VTS_CONTROLLER_CONFIG_NAME")
        for attr in required_attributes:
            if not hasattr(module, attr):
                raise signals.ControllerError(
                    ("Module %s missing required "
                     "controller module attribute %s.") % (module.__name__,
                                                           attr))
            if not getattr(module, attr):
                raise signals.ControllerError(
                    ("Controller interface %s in %s "
                     "cannot be null.") % (attr, module.__name__))

    def registerController(self, module, start_services=True):
        """Registers a controller module for a test run.

        This declares a controller dependency of this test class. If the target
        module exists and matches the controller interface, the controller
        module will be instantiated with corresponding configs in the test
        config file. The module should be imported first.

        Params:
            module: A module that follows the controller module interface.
            start_services: boolean, controls whether services (e.g VTS agent)
                            are started on the target.

        Returns:
            A list of controller objects instantiated from controller_module.

        Raises:
            ControllerError is raised if no corresponding config can be found,
            or if the controller module has already been registered.
        """
        logging.info("cwd: %s", os.getcwd())
        logging.info("adb devices: %s", module.list_adb_devices())
        self.verifyControllerModule(module)
        module_ref_name = module.__name__.split('.')[-1]
        if module_ref_name in self.controller_registry:
            raise signals.ControllerError(
                ("Controller module %s has already "
                 "been registered. It can not be "
                 "registered again.") % module_ref_name)
        # Create controller objects.
        create = module.create
        module_config_name = module.VTS_CONTROLLER_CONFIG_NAME
        if module_config_name not in self.testbed_configs:
            raise signals.ControllerError(("No corresponding config found for"
                                           " %s") % module_config_name)
        try:
            # Make a deep copy of the config to pass to the controller module,
            # in case the controller module modifies the config internally.
            original_config = self.testbed_configs[module_config_name]
            controller_config = copy.deepcopy(original_config)
            logging.info("controller_config: %s", controller_config)
            if "use_vts_agent" not in self.testbed_configs:
                objects = create(controller_config, start_services)
            else:
                objects = create(controller_config,
                                 self.testbed_configs["use_vts_agent"])
        except:
            logging.exception(("Failed to initialize objects for controller "
                               "%s, abort!"), module_config_name)
            raise
        if not isinstance(objects, list):
            raise ControllerError(("Controller module %s did not return a list"
                                   " of objects, abort.") % module_ref_name)
        self.controller_registry[module_ref_name] = objects
        logging.debug("Found %d objects for controller %s", len(objects),
                      module_config_name)
        destroy_func = module.destroy
        self.controller_destructors[module_ref_name] = destroy_func
        return objects

    def unregisterControllers(self):
        """Destroy controller objects and clear internal registry.

        This will be called at the end of each TestRunner.run call.
        """
        for name, destroy in self.controller_destructors.items():
            try:
                logging.debug("Destroying %s.", name)
                dut = self.controller_destructors[name][0]
                destroy(self.controller_registry[name])
            except:
                logging.exception("Exception occurred destroying %s.", name)
        self.controller_registry = {}
        self.controller_destructors = {}

    def parseTestConfig(self, test_configs):
        """Parses the test configuration and unpacks objects and parameters
        into a dictionary to be passed to test classes.

        Args:
            test_configs: A json object representing the test configurations.
        """
        self.test_run_info[
            keys.ConfigKeys.IKEY_TESTBED_NAME] = self.testbed_name
        # Unpack other params.
        self.test_run_info["registerController"] = self.registerController
        self.test_run_info[keys.ConfigKeys.IKEY_LOG_PATH] = self.log_path
        user_param_pairs = []
        for item in test_configs.items():
            if item[0] not in keys.ConfigKeys.RESERVED_KEYS:
                user_param_pairs.append(item)
        self.test_run_info[keys.ConfigKeys.IKEY_USER_PARAM] = copy.deepcopy(
            dict(user_param_pairs))

    def runTestClass(self, test_cls, test_cases=None):
        """Instantiates and executes a test class.

        If test_cases is None, the test cases listed by self.tests will be
        executed instead. If self.tests is empty as well, no test case in this
        test class will be executed.

        Args:
            test_cls: The test class to be instantiated and executed.
            test_cases: List of test case names to execute within the class.

        Returns:
            A tuple, with the number of cases passed at index 0, and the total
            number of test cases at index 1.
        """
        self.running = True
        with test_cls(self.test_run_info) as test_cls_instance:
            try:
                cls_result = test_cls_instance.run(test_cases)
                self.results += cls_result
            except signals.TestAbortAll as e:
                self.results += e.results
                raise e

    def run(self):
        """Executes test cases.

        This will instantiate controller and test classes, and execute test
        classes. This can be called multiple times to repeatly execute the
        requested test cases.

        A call to TestRunner.stop should eventually happen to conclude the life
        cycle of a TestRunner.

        Args:
            test_classes: A dictionary where the key is test class name, and
                          the values are actual test classes.
        """
        if not self.running:
            self.running = True
        # Initialize controller objects and pack appropriate objects/params
        # to be passed to test class.
        self.parseTestConfig(self.test_configs)
        t_configs = self.test_configs[keys.ConfigKeys.KEY_TEST_PATHS]
        test_classes = self.importTestModules(t_configs)
        logging.debug("Executing run list %s.", self.run_list)
        try:
            for test_cls_name, test_case_names in self.run_list:
                if not self.running:
                    break
                if test_case_names:
                    logging.debug("Executing test cases %s in test class %s.",
                                  test_case_names, test_cls_name)
                else:
                    logging.debug("Executing test class %s", test_cls_name)
                try:
                    test_cls = test_classes[test_cls_name]
                except KeyError:
                    raise USERError(
                        ("Unable to locate class %s in any of the test "
                         "paths specified.") % test_cls_name)
                try:
                    self.runTestClass(test_cls, test_case_names)
                except signals.TestAbortAll as e:
                    logging.warning(
                        ("Abort all subsequent test classes. Reason: "
                         "%s"), e)
                    raise
        finally:
            self.unregisterControllers()

    def stop(self):
        """Releases resources from test run. Should always be called after
        TestRunner.run finishes.

        This function concludes a test run and writes out a test report.
        """
        if self.running:
            msg = "\nSummary for test run %s: %s\n" % (self.id,
                                                       self.results.summary())
            self._writeResultsJsonString()
            logging.info(msg.strip())
            logger.killTestLogger(logging.getLogger())
            self.running = False

    def _writeResultsJsonString(self):
        """Writes out a json file with the test result info for easy parsing.
        """
        path = os.path.join(self.log_path, "test_run_summary.json")
        with open(path, 'w') as f:
            f.write(self.results.jsonString())