普通文本  |  570行  |  19.97 KB

#!/usr/bin/python
# pylint: disable=missing-docstring

import logging
import os
import shutil
import StringIO
import sys
import tempfile
import unittest

import common
from autotest_lib.client.bin import job, sysinfo, harness
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import logging_manager, logging_config
from autotest_lib.client.common_lib import base_job_unittest
from autotest_lib.client.common_lib.test_utils import mock


class job_test_case(unittest.TestCase):
    """Generic job TestCase class that defines a standard job setUp and
    tearDown, with some standard stubs."""

    job_class = job.base_client_job

    def setUp(self):
        self.god = mock.mock_god(ut=self)
        self.god.stub_with(job.base_client_job, '_get_environ_autodir',
                           classmethod(lambda cls: '/adir'))
        self.job = self.job_class.__new__(self.job_class)
        self.job._job_directory = base_job_unittest.stub_job_directory

        _, self.control_file = tempfile.mkstemp()


    def tearDown(self):
        self.god.unstub_all()
        os.remove(self.control_file)


class test_find_base_directories(
        base_job_unittest.test_find_base_directories.generic_tests,
        job_test_case):

    def test_autodir_equals_clientdir(self):
        autodir, clientdir, _ = self.job._find_base_directories()
        self.assertEqual(autodir, '/adir')
        self.assertEqual(clientdir, '/adir')


    def test_serverdir_is_none(self):
        _, _, serverdir = self.job._find_base_directories()
        self.assertEqual(serverdir, None)


class abstract_test_init(base_job_unittest.test_init.generic_tests):
    """Generic client job mixin used when defining variations on the
    job.__init__ generic tests."""
    OPTIONAL_ATTRIBUTES = (
        base_job_unittest.test_init.generic_tests.OPTIONAL_ATTRIBUTES
        - set(['control', 'harness']))


class test_init_minimal_options(abstract_test_init, job_test_case):

    def call_init(self):
        # TODO(jadmanski): refactor more of the __init__ code to not need to
        # stub out countless random APIs
        self.god.stub_function_to_return(job.os, 'mkdir', None)
        self.god.stub_function_to_return(job.os.path, 'exists', True)
        self.god.stub_function_to_return(self.job, '_load_state', None)
        self.god.stub_function_to_return(self.job, 'record', None)
        self.god.stub_function_to_return(job.shutil, 'copyfile', None)
        self.god.stub_function_to_return(job.logging_manager,
                                         'configure_logging', None)
        class manager:
            def start_logging(self):
                return None
        self.god.stub_function_to_return(job.logging_manager,
                                         'get_logging_manager', manager())
        class stub_sysinfo:
            def log_per_reboot_data(self):
                return None
        self.god.stub_function_to_return(job.sysinfo, 'sysinfo',
                                         stub_sysinfo())
        class stub_harness:
            run_start = lambda self: None
        self.god.stub_function_to_return(job.harness, 'select', stub_harness())
        class options:
            tag = ''
            verbose = False
            cont = False
            harness = 'stub'
            harness_args = None
            hostname = None
            user = None
            log = False
            args = ''
            output_dir = ''
        self.god.stub_function_to_return(job.utils, 'drop_caches', None)

        self.job._job_state = base_job_unittest.stub_job_state
        self.job.__init__(self.control_file, options)


class dummy(object):
    """A simple placeholder for attributes"""
    pass


class first_line_comparator(mock.argument_comparator):
    def __init__(self, first_line):
        self.first_line = first_line


    def is_satisfied_by(self, parameter):
        return self.first_line == parameter.splitlines()[0]


class test_base_job(unittest.TestCase):
    def setUp(self):
        # make god
        self.god = mock.mock_god(ut=self)

        # need to set some environ variables
        self.autodir = "autodir"
        os.environ['AUTODIR'] = self.autodir

        # set up some variables
        _, self.control = tempfile.mkstemp()
        self.jobtag = "jobtag"

        # get rid of stdout and logging
        sys.stdout = StringIO.StringIO()
        logging_manager.configure_logging(logging_config.TestingConfig())
        logging.disable(logging.CRITICAL)
        def dummy_configure_logging(*args, **kwargs):
            pass
        self.god.stub_with(logging_manager, 'configure_logging',
                           dummy_configure_logging)
        real_get_logging_manager = logging_manager.get_logging_manager
        def get_logging_manager_no_fds(manage_stdout_and_stderr=False,
                                       redirect_fds=False):
            return real_get_logging_manager(manage_stdout_and_stderr, False)
        self.god.stub_with(logging_manager, 'get_logging_manager',
                           get_logging_manager_no_fds)

        # stub out some stuff
        self.god.stub_function(os.path, 'exists')
        self.god.stub_function(os.path, 'isdir')
        self.god.stub_function(os, 'makedirs')
        self.god.stub_function(os, 'mkdir')
        self.god.stub_function(os, 'remove')
        self.god.stub_function(shutil, 'rmtree')
        self.god.stub_function(shutil, 'copyfile')
        self.god.stub_function(job, 'open')
        self.god.stub_function(utils, 'system')
        self.god.stub_function(utils, 'drop_caches')
        self.god.stub_function(harness, 'select')
        self.god.stub_function(sysinfo, 'log_per_reboot_data')

        self.god.stub_class(job.local_host, 'LocalHost')
        self.god.stub_class(sysinfo, 'sysinfo')

        self.god.stub_class_method(job.base_client_job,
                                   '_cleanup_debugdir_files')
        self.god.stub_class_method(job.base_client_job, '_cleanup_results_dir')

        self.god.stub_with(job.base_job.job_directory, '_ensure_valid',
                           lambda *_: None)


    def tearDown(self):
        sys.stdout = sys.__stdout__
        self.god.unstub_all()
        os.remove(self.control)


    def _setup_pre_record_init(self, cont):
        self.god.stub_function(self.job, '_load_state')

        resultdir = os.path.join(self.autodir, 'results', self.jobtag)
        tmpdir = os.path.join(self.autodir, 'tmp')
        if not cont:
            job.base_client_job._cleanup_debugdir_files.expect_call()
            job.base_client_job._cleanup_results_dir.expect_call()

        self.job._load_state.expect_call()

        my_harness = self.god.create_mock_class(harness.harness,
                                                'my_harness')
        harness.select.expect_call(None,
                                   self.job,
                                   None).and_return(my_harness)

        return resultdir, my_harness


    def _setup_post_record_init(self, cont, resultdir, my_harness):
        # now some specific stubs
        self.god.stub_function(self.job, 'config_get')
        self.god.stub_function(self.job, 'config_set')
        self.god.stub_function(self.job, 'record')

        # other setup
        results = os.path.join(self.autodir, 'results')
        download = os.path.join(self.autodir, 'tests', 'download')
        pkgdir = os.path.join(self.autodir, 'packages')

        utils.drop_caches.expect_call()
        job_sysinfo = sysinfo.sysinfo.expect_new(resultdir)
        if not cont:
            os.path.exists.expect_call(download).and_return(False)
            os.mkdir.expect_call(download)
            shutil.copyfile.expect_call(mock.is_string_comparator(),
                                 os.path.join(resultdir, 'control'))

        job.local_host.LocalHost.expect_new(hostname='localhost')
        job_sysinfo.log_per_reboot_data.expect_call()
        if not cont:
            self.job.record.expect_call('START', None, None)

        my_harness.run_start.expect_call()


    def construct_job(self, cont):
        # will construct class instance using __new__
        self.job = job.base_client_job.__new__(job.base_client_job)

        # record
        resultdir, my_harness = self._setup_pre_record_init(cont)
        self._setup_post_record_init(cont, resultdir, my_harness)

        # finish constructor
        options = dummy()
        options.tag = self.jobtag
        options.cont = cont
        options.harness = None
        options.harness_args = None
        options.log = False
        options.verbose = False
        options.hostname = 'localhost'
        options.user = 'my_user'
        options.args = ''
        options.output_dir = ''
        self.job.__init__(self.control, options)

        # check
        self.god.check_playback()


    def get_partition_mock(self, devname):
        """
        Create a mock of a partition object and return it.
        """
        class mock(object):
            device = devname
            get_mountpoint = self.god.create_mock_function('get_mountpoint')
        return mock


    def test_constructor_first_run(self):
        self.construct_job(False)


    def test_constructor_continuation(self):
        self.construct_job(True)


    def test_constructor_post_record_failure(self):
        """
        Test post record initialization failure.
        """
        self.job = job.base_client_job.__new__(job.base_client_job)
        options = dummy()
        options.tag = self.jobtag
        options.cont = False
        options.harness = None
        options.harness_args = None
        options.log = False
        options.verbose = False
        options.hostname = 'localhost'
        options.user = 'my_user'
        options.args = ''
        options.output_dir = ''
        error = Exception('fail')

        self.god.stub_function(self.job, '_post_record_init')
        self.god.stub_function(self.job, 'record')

        self._setup_pre_record_init(False)
        self.job._post_record_init.expect_call(
                self.control, options, True).and_raises(error)
        self.job.record.expect_call(
                'ABORT', None, None,'client.bin.job.__init__ failed: %s' %
                str(error))

        self.assertRaises(
                Exception, self.job.__init__, self.control, options,
                drop_caches=True)

        # check
        self.god.check_playback()


    def test_control_functions(self):
        self.construct_job(True)
        control_file = "blah"
        self.job.control_set(control_file)
        self.assertEquals(self.job.control_get(), os.path.abspath(control_file))


    def test_harness_select(self):
        self.construct_job(True)

        # record
        which = "which"
        harness_args = ''
        harness.select.expect_call(which, self.job, 
                                   harness_args).and_return(None)

        # run and test
        self.job.harness_select(which, harness_args)
        self.god.check_playback()


    def test_setup_dirs_raise(self):
        self.construct_job(True)

        # setup
        results_dir = 'foo'
        tmp_dir = 'bar'

        # record
        os.path.exists.expect_call(tmp_dir).and_return(True)
        os.path.isdir.expect_call(tmp_dir).and_return(False)

        # test
        self.assertRaises(ValueError, self.job.setup_dirs, results_dir, tmp_dir)
        self.god.check_playback()


    def test_setup_dirs(self):
        self.construct_job(True)

        # setup
        results_dir1 = os.path.join(self.job.resultdir, 'build')
        results_dir2 = os.path.join(self.job.resultdir, 'build.2')
        results_dir3 = os.path.join(self.job.resultdir, 'build.3')
        tmp_dir = 'bar'

        # record
        os.path.exists.expect_call(tmp_dir).and_return(False)
        os.mkdir.expect_call(tmp_dir)
        os.path.isdir.expect_call(tmp_dir).and_return(True)
        os.path.exists.expect_call(results_dir1).and_return(True)
        os.path.exists.expect_call(results_dir2).and_return(True)
        os.path.exists.expect_call(results_dir3).and_return(False)
        os.path.exists.expect_call(results_dir3).and_return(False)
        os.mkdir.expect_call(results_dir3)

        # test
        self.assertEqual(self.job.setup_dirs(None, tmp_dir),
                         (results_dir3, tmp_dir))
        self.god.check_playback()


    def test_run_test_logs_test_error_from_unhandled_error(self):
        self.construct_job(True)

        # set up stubs
        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
        self.god.stub_function(self.job, "_runtest")

        # create an unhandled error object
        class MyError(error.TestError):
            pass
        real_error = MyError("this is the real error message")
        unhandled_error = error.UnhandledTestError(real_error)

        # set up the recording
        testname = "error_test"
        outputdir = os.path.join(self.job.resultdir, testname)
        self.job.pkgmgr.get_package_name.expect_call(
            testname, 'test').and_return(("", testname))
        os.path.exists.expect_call(outputdir).and_return(False)
        self.job.record.expect_call("START", testname, testname,
                                    optional_fields=None)
        self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
            unhandled_error)
        self.job.record.expect_call("ERROR", testname, testname,
                                    first_line_comparator(str(real_error)))
        self.job.record.expect_call("END ERROR", testname, testname)
        self.job.harness.run_test_complete.expect_call()
        utils.drop_caches.expect_call()

        # run and check
        self.job.run_test(testname)
        self.god.check_playback()


    def test_run_test_logs_non_test_error_from_unhandled_error(self):
        self.construct_job(True)

        # set up stubs
        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
        self.god.stub_function(self.job, "_runtest")

        # create an unhandled error object
        class MyError(Exception):
            pass
        real_error = MyError("this is the real error message")
        unhandled_error = error.UnhandledTestError(real_error)
        reason = first_line_comparator("Unhandled MyError: %s" % real_error)

        # set up the recording
        testname = "error_test"
        outputdir = os.path.join(self.job.resultdir, testname)
        self.job.pkgmgr.get_package_name.expect_call(
            testname, 'test').and_return(("", testname))
        os.path.exists.expect_call(outputdir).and_return(False)
        self.job.record.expect_call("START", testname, testname,
                                    optional_fields=None)
        self.job._runtest.expect_call(testname, "", None, (), {}).and_raises(
            unhandled_error)
        self.job.record.expect_call("ERROR", testname, testname, reason)
        self.job.record.expect_call("END ERROR", testname, testname)
        self.job.harness.run_test_complete.expect_call()
        utils.drop_caches.expect_call()

        # run and check
        self.job.run_test(testname)
        self.god.check_playback()


    def test_report_reboot_failure(self):
        self.construct_job(True)

        # record
        self.job.record.expect_call("ABORT", "sub", "reboot.verify",
                                    "boot failure")
        self.job.record.expect_call("END ABORT", "sub", "reboot",
                                    optional_fields={"kernel": "2.6.15-smp"})

        # playback
        self.job._record_reboot_failure("sub", "reboot.verify", "boot failure",
                                        running_id="2.6.15-smp")
        self.god.check_playback()


    def _setup_check_post_reboot(self, mount_info, cpu_count):
        # setup
        self.god.stub_function(job.partition_lib, "get_partition_list")
        self.god.stub_function(utils, "count_cpus")

        part_list = [self.get_partition_mock("/dev/hda1"),
                     self.get_partition_mock("/dev/hdb1")]
        mount_list = ["/mnt/hda1", "/mnt/hdb1"]

        # record
        job.partition_lib.get_partition_list.expect_call(
                self.job, exclude_swap=False).and_return(part_list)
        for i in xrange(len(part_list)):
            part_list[i].get_mountpoint.expect_call().and_return(mount_list[i])
        if cpu_count is not None:
            utils.count_cpus.expect_call().and_return(cpu_count)
        self.job._state.set('client', 'mount_info', mount_info)
        self.job._state.set('client', 'cpu_count', 8)


    def test_check_post_reboot_success(self):
        self.construct_job(True)

        mount_info = set([("/dev/hda1", "/mnt/hda1"),
                          ("/dev/hdb1", "/mnt/hdb1")])
        self._setup_check_post_reboot(mount_info, 8)

        # playback
        self.job._check_post_reboot("sub")
        self.god.check_playback()


    def test_check_post_reboot_mounts_failure(self):
        self.construct_job(True)

        mount_info = set([("/dev/hda1", "/mnt/hda1")])
        self._setup_check_post_reboot(mount_info, None)

        self.god.stub_function(self.job, "_record_reboot_failure")
        self.job._record_reboot_failure.expect_call("sub",
                "reboot.verify_config", "mounted partitions are different after"
                " reboot (old entries: set([]), new entries: set([('/dev/hdb1',"
                " '/mnt/hdb1')]))", running_id=None)

        # playback
        self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
        self.god.check_playback()


    def test_check_post_reboot_cpu_failure(self):
        self.construct_job(True)

        mount_info = set([("/dev/hda1", "/mnt/hda1"),
                          ("/dev/hdb1", "/mnt/hdb1")])
        self._setup_check_post_reboot(mount_info, 4)

        self.god.stub_function(self.job, "_record_reboot_failure")
        self.job._record_reboot_failure.expect_call(
            'sub', 'reboot.verify_config',
            'Number of CPUs changed after reboot (old count: 8, new count: 4)',
            running_id=None)

        # playback
        self.assertRaises(error.JobError, self.job._check_post_reboot, "sub")
        self.god.check_playback()


    def test_parse_args(self):
        test_set = {"a='foo bar baz' b='moo apt'":
                    ["a='foo bar baz'", "b='moo apt'"],
                    "a='foo bar baz' only=gah":
                    ["a='foo bar baz'", "only=gah"],
                    "a='b c d' no=argh":
                    ["a='b c d'", "no=argh"]}
        for t in test_set:
            parsed_args = job.base_client_job._parse_args(t)
            expected_args = test_set[t]
            self.assertEqual(parsed_args, expected_args)


    def test_run_test_timeout_parameter_is_propagated(self):
        self.construct_job(True)

        # set up stubs
        self.god.stub_function(self.job.pkgmgr, 'get_package_name')
        self.god.stub_function(self.job, "_runtest")

        # create an unhandled error object
        #class MyError(error.TestError):
        #    pass
        #real_error = MyError("this is the real error message")
        #unhandled_error = error.UnhandledTestError(real_error)

        # set up the recording
        testname = "test"
        outputdir = os.path.join(self.job.resultdir, testname)
        self.job.pkgmgr.get_package_name.expect_call(
            testname, 'test').and_return(("", testname))
        os.path.exists.expect_call(outputdir).and_return(False)
        timeout = 60
        optional_fields = {}
        optional_fields['timeout'] = timeout
        self.job.record.expect_call("START", testname, testname,
                                    optional_fields=optional_fields)
        self.job._runtest.expect_call(testname, "", timeout, (), {})
        self.job.record.expect_call("GOOD", testname, testname,
                                    "completed successfully")
        self.job.record.expect_call("END GOOD", testname, testname)
        self.job.harness.run_test_complete.expect_call()
        utils.drop_caches.expect_call()

        # run and check
        self.job.run_test(testname, timeout=timeout)
        self.god.check_playback()


if __name__ == "__main__":
    unittest.main()