普通文本  |  864行  |  35.98 KB

#!/usr/bin/python
#
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.


"""Unit tests for server/cros/dynamic_suite/dynamic_suite.py."""

import collections
from collections import OrderedDict
import os
import shutil
import tempfile
import unittest

import mock
import mox

import common

from autotest_lib.client.common_lib import base_job
from autotest_lib.client.common_lib import control_data
from autotest_lib.client.common_lib import error
from autotest_lib.client.common_lib import priorities
from autotest_lib.client.common_lib import utils
from autotest_lib.client.common_lib.cros import dev_server
from autotest_lib.server import frontend
from autotest_lib.server.cros import provision
from autotest_lib.server.cros.dynamic_suite import control_file_getter
from autotest_lib.server.cros.dynamic_suite import constants
from autotest_lib.server.cros.dynamic_suite import job_status
from autotest_lib.server.cros.dynamic_suite import suite as SuiteBase
from autotest_lib.server.cros.dynamic_suite.comparators import StatusContains
from autotest_lib.server.cros.dynamic_suite.fakes import FakeControlData
from autotest_lib.server.cros.dynamic_suite.fakes import FakeJob
from autotest_lib.server.cros.dynamic_suite.suite import RetryHandler
from autotest_lib.server.cros.dynamic_suite.suite import Suite


class SuiteTest(mox.MoxTestBase):
    """Unit tests for dynamic_suite Suite class.

    @var _BUILDS: fake build
    @var _TAG: fake suite tag
    """

    _BOARD = 'board:board'
    _BUILDS = {provision.CROS_VERSION_PREFIX:'build_1',
               provision.FW_RW_VERSION_PREFIX:'fwrw_build_1'}
    _TAG = 'au'
    _ATTR = {'attr:attr'}
    _DEVSERVER_HOST = 'http://dontcare:8080'
    _FAKE_JOB_ID = 10


    def setUp(self):
        """Setup."""
        super(SuiteTest, self).setUp()
        self.maxDiff = None
        self.use_batch = SuiteBase.ENABLE_CONTROLS_IN_BATCH
        SuiteBase.ENABLE_CONTROLS_IN_BATCH = False
        self.afe = self.mox.CreateMock(frontend.AFE)
        self.tko = self.mox.CreateMock(frontend.TKO)

        self.tmpdir = tempfile.mkdtemp(suffix=type(self).__name__)

        self.getter = self.mox.CreateMock(control_file_getter.ControlFileGetter)
        self.devserver = dev_server.ImageServer(self._DEVSERVER_HOST)

        self.files = OrderedDict(
                [('one', FakeControlData(self._TAG, self._ATTR, 'data_one',
                                         'FAST', job_retries=None)),
                 ('two', FakeControlData(self._TAG, self._ATTR, 'data_two',
                                         'SHORT', dependencies=['feta'])),
                 ('three', FakeControlData(self._TAG, self._ATTR, 'data_three',
                                           'MEDIUM')),
                 ('four', FakeControlData('other', self._ATTR, 'data_four',
                                          'LONG', dependencies=['arugula'])),
                 ('five', FakeControlData(self._TAG, {'other'}, 'data_five',
                                          'LONG', dependencies=['arugula',
                                                                'caligula'])),
                 ('six', FakeControlData(self._TAG, self._ATTR, 'data_six',
                                         'LENGTHY')),
                 ('seven', FakeControlData(self._TAG, self._ATTR, 'data_seven',
                                           'FAST', job_retries=1))])

        self.files_to_filter = {
            'with/deps/...': FakeControlData(self._TAG, self._ATTR,
                                             'gets filtered'),
            'with/profilers/...': FakeControlData(self._TAG, self._ATTR,
                                                  'gets filtered')}


    def tearDown(self):
        """Teardown."""
        SuiteBase.ENABLE_CONTROLS_IN_BATCH = self.use_batch
        super(SuiteTest, self).tearDown()
        shutil.rmtree(self.tmpdir, ignore_errors=True)


    def expect_control_file_parsing(self, suite_name=_TAG):
        """Expect an attempt to parse the 'control files' in |self.files|.

        @param suite_name: The suite name to parse control files for.
        """
        all_files = self.files.keys() + self.files_to_filter.keys()
        self._set_control_file_parsing_expectations(False, all_files,
                                                    self.files, suite_name)


    def _set_control_file_parsing_expectations(self, already_stubbed,
                                               file_list, files_to_parse,
                                               suite_name):
        """Expect an attempt to parse the 'control files' in |files|.

        @param already_stubbed: parse_control_string already stubbed out.
        @param file_list: the files the dev server returns
        @param files_to_parse: the {'name': FakeControlData} dict of files we
                               expect to get parsed.
        """
        if not already_stubbed:
            self.mox.StubOutWithMock(control_data, 'parse_control_string')

        self.getter.get_control_file_list(
                suite_name=suite_name).AndReturn(file_list)
        for file, data in files_to_parse.iteritems():
            self.getter.get_control_file_contents(
                    file).InAnyOrder().AndReturn(data.string)
            control_data.parse_control_string(
                    data.string,
                    raise_warnings=True,
                    path=file).InAnyOrder().AndReturn(data)


    def expect_control_file_parsing_in_batch(self, suite_name=_TAG):
        """Expect an attempt to parse the contents of all control files in
        |self.files| and |self.files_to_filter|, form them to a dict.

        @param suite_name: The suite name to parse control files for.
        """
        self.getter = self.mox.CreateMock(control_file_getter.DevServerGetter)
        self.mox.StubOutWithMock(control_data, 'parse_control_string')
        suite_info = {}
        for k, v in self.files.iteritems():
            suite_info[k] = v.string
            control_data.parse_control_string(
                    v.string,
                    raise_warnings=True,
                    path=k).InAnyOrder().AndReturn(v)
        for k, v in self.files_to_filter.iteritems():
            suite_info[k] = v.string
        self.getter._dev_server = self._DEVSERVER_HOST
        self.getter.get_suite_info(
                suite_name=suite_name).AndReturn(suite_info)


    def testFindAllTestInBatch(self):
        """Test switch on enable_getting_controls_in_batch for function
        find_all_test."""
        self.use_batch = SuiteBase.ENABLE_CONTROLS_IN_BATCH
        self.expect_control_file_parsing_in_batch()
        SuiteBase.ENABLE_CONTROLS_IN_BATCH = True

        self.mox.ReplayAll()

        predicate = lambda d: d.suite == self._TAG
        tests = SuiteBase.find_and_parse_tests(self.getter,
                                               predicate,
                                               self._TAG)
        self.assertEquals(len(tests), 6)
        self.assertTrue(self.files['one'] in tests)
        self.assertTrue(self.files['two'] in tests)
        self.assertTrue(self.files['three'] in tests)
        self.assertTrue(self.files['five'] in tests)
        self.assertTrue(self.files['six'] in tests)
        self.assertTrue(self.files['seven'] in tests)
        SuiteBase.ENABLE_CONTROLS_IN_BATCH = self.use_batch


    def testFindAndParseStableTests(self):
        """Should find only tests that match a predicate."""
        self.expect_control_file_parsing()
        self.mox.ReplayAll()

        predicate = lambda d: d.text == self.files['two'].string
        tests = SuiteBase.find_and_parse_tests(self.getter,
                                               predicate,
                                               self._TAG)
        self.assertEquals(len(tests), 1)
        self.assertEquals(tests[0], self.files['two'])


    def testFindSuiteSyntaxErrors(self):
        """Check all control files for syntax errors.

        This test actually parses all control files in the autotest directory
        for syntax errors, by using the un-forgiving parser and pretending to
        look for all control files with the suite attribute.
        """
        autodir = os.path.abspath(
            os.path.join(os.path.dirname(__file__), '..', '..', '..'))
        fs_getter = SuiteBase.create_fs_getter(autodir)
        predicate = lambda t: hasattr(t, 'suite')
        SuiteBase.find_and_parse_tests(fs_getter, predicate,
                                       forgiving_parser=False)


    def testFindAndParseTestsSuite(self):
        """Should find all tests that match a predicate."""
        self.expect_control_file_parsing()
        self.mox.ReplayAll()

        predicate = lambda d: d.suite == self._TAG
        tests = SuiteBase.find_and_parse_tests(self.getter,
                                               predicate,
                                               self._TAG)
        self.assertEquals(len(tests), 6)
        self.assertTrue(self.files['one'] in tests)
        self.assertTrue(self.files['two'] in tests)
        self.assertTrue(self.files['three'] in tests)
        self.assertTrue(self.files['five'] in tests)
        self.assertTrue(self.files['six'] in tests)
        self.assertTrue(self.files['seven'] in tests)


    def testFindAndParseTestsAttr(self):
        """Should find all tests that match a predicate."""
        self.expect_control_file_parsing()
        self.mox.ReplayAll()

        predicate = SuiteBase.matches_attribute_expression_predicate('attr:attr')
        tests = SuiteBase.find_and_parse_tests(self.getter,
                                               predicate,
                                               self._TAG)
        self.assertEquals(len(tests), 6)
        self.assertTrue(self.files['one'] in tests)
        self.assertTrue(self.files['two'] in tests)
        self.assertTrue(self.files['three'] in tests)
        self.assertTrue(self.files['four'] in tests)
        self.assertTrue(self.files['six'] in tests)
        self.assertTrue(self.files['seven'] in tests)


    def testAdHocSuiteCreation(self):
        """Should be able to schedule an ad-hoc suite by specifying
        a single test name."""
        self.expect_control_file_parsing(suite_name='ad_hoc_suite')
        self.mox.ReplayAll()
        predicate = SuiteBase.test_name_equals_predicate('name-data_five')
        suite = Suite.create_from_predicates([predicate], self._BUILDS,
                                       self._BOARD, devserver=None,
                                       cf_getter=self.getter,
                                       afe=self.afe, tko=self.tko)

        self.assertFalse(self.files['one'] in suite.tests)
        self.assertFalse(self.files['two'] in suite.tests)
        self.assertFalse(self.files['four'] in suite.tests)
        self.assertTrue(self.files['five'] in suite.tests)


    def mock_control_file_parsing(self):
        """Fake out find_and_parse_tests(), returning content from |self.files|.
        """
        for test in self.files.values():
            test.text = test.string  # mimic parsing.
        self.mox.StubOutWithMock(SuiteBase, 'find_and_parse_tests')
        SuiteBase.find_and_parse_tests(
            mox.IgnoreArg(),
            mox.IgnoreArg(),
            mox.IgnoreArg(),
            forgiving_parser=True,
            run_prod_code=False,
            test_args=None).AndReturn(self.files.values())


    def expect_job_scheduling(self, recorder,
                              tests_to_skip=[], ignore_deps=False,
                              raises=False, suite_deps=[], suite=None,
                              extra_keyvals={}):
        """Expect jobs to be scheduled for 'tests' in |self.files|.

        @param recorder: object with a record_entry to be used to record test
                         results.
        @param tests_to_skip: [list, of, test, names] that we expect to skip.
        @param ignore_deps: If true, ignore tests' dependencies.
        @param raises: If True, expect exceptions.
        @param suite_deps: If True, add suite level dependencies.
        @param extra_keyvals: Extra keyvals set to tests.
        """
        record_job_id = suite and suite._results_dir
        if record_job_id:
            self.mox.StubOutWithMock(suite, '_remember_job_keyval')
        recorder.record_entry(
            StatusContains.CreateFromStrings('INFO', 'Start %s' % self._TAG),
            log_in_subdir=False)
        tests = self.files.values()
        n = 1
        for test in tests:
            if test.name in tests_to_skip:
                continue
            dependencies = []
            if not ignore_deps:
                dependencies.extend(test.dependencies)
            if suite_deps:
                dependencies.extend(suite_deps)
            dependencies.append(self._BOARD)
            build = self._BUILDS[provision.CROS_VERSION_PREFIX]
            keyvals = {
                'build': build,
                'suite': self._TAG,
                'builds': SuiteTest._BUILDS,
                'experimental':test.experimental,
            }
            keyvals.update(extra_keyvals)
            job_mock = self.afe.create_job(
                control_file=test.text,
                name=mox.And(mox.StrContains(build),
                             mox.StrContains(test.name)),
                control_type=mox.IgnoreArg(),
                meta_hosts=[self._BOARD],
                dependencies=dependencies,
                keyvals=keyvals,
                max_runtime_mins=24*60,
                timeout_mins=1440,
                parent_job_id=None,
                test_retry=0,
                reboot_before=mox.IgnoreArg(),
                run_reset=mox.IgnoreArg(),
                priority=priorities.Priority.DEFAULT,
                synch_count=test.sync_count,
                require_ssp=test.require_ssp
                )
            if raises:
                job_mock.AndRaise(error.NoEligibleHostException())
                recorder.record_entry(
                        StatusContains.CreateFromStrings('START', test.name),
                        log_in_subdir=False)
                recorder.record_entry(
                        StatusContains.CreateFromStrings('TEST_NA', test.name),
                        log_in_subdir=False)
                recorder.record_entry(
                        StatusContains.CreateFromStrings('END', test.name),
                        log_in_subdir=False)
            else:
                fake_job = FakeJob(id=n)
                job_mock.AndReturn(fake_job)
                if record_job_id:
                    suite._remember_job_keyval(fake_job)
                n += 1


    def testScheduleTestsAndRecord(self):
        """Should schedule stable and experimental tests with the AFE."""
        name_list = ['name-data_one', 'name-data_two', 'name-data_three',
                     'name-data_four', 'name-data_five', 'name-data_six',
                     'name-data_seven']
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 7,
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko,
                                       results_dir=self.tmpdir)
        self.mox.ResetAll()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder, suite=suite)

        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(self.tmpdir, keyval_dict)
        self.mox.ReplayAll()
        suite.schedule(recorder.record_entry)
        for job in suite._jobs:
            self.assertTrue(hasattr(job, 'test_name'))


    def testScheduleTests(self):
        """Should schedule tests with the AFE."""
        name_list = ['name-data_one', 'name-data_two', 'name-data_three',
                     'name-data_four', 'name-data_five', 'name-data_six',
                     'name-data_seven']
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: len(name_list),
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder)
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, keyval_dict)

        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko)
        suite.schedule(recorder.record_entry)


    def testScheduleTestsIgnoreDeps(self):
        """Test scheduling tests ignoring deps."""
        name_list = ['name-data_one', 'name-data_two', 'name-data_three',
                     'name-data_four', 'name-data_five', 'name-data_six',
                     'name-data_seven']
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: len(name_list),
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder, ignore_deps=True)
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, keyval_dict)

        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko,
                                       ignore_deps=True)
        suite.schedule(recorder.record_entry)


    def testScheduleUnrunnableTestsTESTNA(self):
        """Tests which fail to schedule should be TEST_NA."""
        # Since all tests will be fail to schedule, the num of scheduled tests
        # will be zero.
        name_list = []
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 0,
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder, raises=True)
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, keyval_dict)
        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko)
        suite.schedule(recorder.record_entry)


    def testRetryMapAfterScheduling(self):
        """Test job-test and test-job mapping are correctly updated."""
        name_list = ['name-data_one', 'name-data_two', 'name-data_three',
                     'name-data_four', 'name-data_five', 'name-data_six',
                     'name-data_seven']
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 7,
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder)
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, keyval_dict)

        all_files = self.files.items()
        # Sort tests in self.files so that they are in the same
        # order as they are scheduled.
        expected_retry_map = {}
        for n in range(len(all_files)):
            test = all_files[n][1]
            job_id = n + 1
            job_retries = 1 if test.job_retries is None else test.job_retries
            if job_retries > 0:
                expected_retry_map[job_id] = {
                        'state': RetryHandler.States.NOT_ATTEMPTED,
                        'retry_max': job_retries}

        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko,
                                       job_retry=True)
        suite.schedule(recorder.record_entry)

        self.assertEqual(expected_retry_map, suite._retry_handler._retry_map)


    def testSuiteMaxRetries(self):
        """Test suite max retries."""
        name_list = ['name-data_one', 'name-data_two', 'name-data_three',
                     'name-data_four', 'name-data_five',
                     'name-data_six', 'name-data_seven']
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: 7,
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder)
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, keyval_dict)
        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko,
                                       job_retry=True, max_retries=1)
        suite.schedule(recorder.record_entry)
        self.assertEqual(suite._retry_handler._max_retries, 1)
        # Find the job_id of the test that allows retry
        job_id = suite._retry_handler._retry_map.iterkeys().next()
        suite._retry_handler.add_retry(old_job_id=job_id, new_job_id=10)
        self.assertEqual(suite._retry_handler._max_retries, 0)


    def testSuiteDependencies(self):
        """Should add suite dependencies to tests scheduled."""
        name_list = ['name-data_one', 'name-data_two', 'name-data_three',
                     'name-data_four', 'name-data_five', 'name-data_six',
                     'name-data_seven']
        keyval_dict = {constants.SCHEDULED_TEST_COUNT_KEY: len(name_list),
                       constants.SCHEDULED_TEST_NAMES_KEY: repr(name_list)}

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(recorder, suite_deps=['extra'])
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, keyval_dict)

        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver, extra_deps=['extra'],
                                       afe=self.afe, tko=self.tko)
        suite.schedule(recorder.record_entry)


    def testInheritedKeyvals(self):
        """Tests should inherit some whitelisted job keyvals."""
        # Only keyvals in constants.INHERITED_KEYVALS are inherited to tests.
        job_keyvals = {
            constants.KEYVAL_CIDB_BUILD_ID: '111',
            constants.KEYVAL_CIDB_BUILD_STAGE_ID: '222',
            'your': 'name',
        }
        test_keyvals = {
            constants.KEYVAL_CIDB_BUILD_ID: '111',
            constants.KEYVAL_CIDB_BUILD_STAGE_ID: '222',
        }

        self.mock_control_file_parsing()
        recorder = self.mox.CreateMock(base_job.base_job)
        self.expect_job_scheduling(
            recorder,
            extra_keyvals=test_keyvals)
        self.mox.StubOutWithMock(utils, 'write_keyval')
        utils.write_keyval(None, job_keyvals)
        utils.write_keyval(None, mox.IgnoreArg())

        self.mox.ReplayAll()
        suite = Suite.create_from_name(self._TAG, self._BUILDS, self._BOARD,
                                       self.devserver,
                                       afe=self.afe, tko=self.tko,
                                       job_keyvals=job_keyvals)
        suite.schedule(recorder.record_entry)


    def _createSuiteWithMockedTestsAndControlFiles(self, file_bugs=False):
        """Create a Suite, using mocked tests and control file contents.

        @return Suite object, after mocking out behavior needed to create it.
        """
        self.result_reporter = _MemoryResultReporter()
        self.expect_control_file_parsing()
        self.mox.ReplayAll()
        suite = Suite.create_from_name(
                self._TAG,
                self._BUILDS,
                self._BOARD,
                self.devserver,
                self.getter,
                afe=self.afe,
                tko=self.tko,
                file_bugs=file_bugs,
                job_retry=True,
                result_reporter=self.result_reporter,
        )
        self.mox.ResetAll()
        return suite


    def _createSuiteMockResults(self, results_dir=None, result_status='FAIL'):
        """Create a suite, returned a set of mocked results to expect.

        @param results_dir: A mock results directory.
        @param result_status: A desired result status, e.g. 'FAIL', 'WARN'.

        @return List of mocked results to wait on.
        """
        self.suite = self._createSuiteWithMockedTestsAndControlFiles(
                         file_bugs=True)
        self.suite._results_dir = results_dir
        test_report = self._get_bad_test_report(result_status)
        test_predicates = test_report.predicates
        test_fallout = test_report.fallout

        self.recorder = self.mox.CreateMock(base_job.base_job)
        self.recorder.record_entry = self.mox.CreateMock(
                base_job.base_job.record_entry)
        self._mock_recorder_with_results([test_predicates], self.recorder)
        return [test_predicates, test_fallout]


    def _mock_recorder_with_results(self, results, recorder):
        """
        Checks that results are recoded in order, eg:
        START, (status, name, reason) END

        @param results: list of results
        @param recorder: status recorder
        """
        for result in results:
            status = result[0]
            test_name = result[1]
            recorder.record_entry(
                StatusContains.CreateFromStrings('START', test_name),
                log_in_subdir=False)
            recorder.record_entry(
                StatusContains.CreateFromStrings(*result),
                log_in_subdir=False).InAnyOrder('results')
            recorder.record_entry(
                StatusContains.CreateFromStrings('END %s' % status, test_name),
                log_in_subdir=False)


    def schedule_and_expect_these_results(self, suite, results, recorder):
        """Create mox stubs for call to suite.schedule and
        job_status.wait_for_results

        @param suite:    suite object for which to stub out schedule(...)
        @param results:  results object to be returned from
                         job_stats_wait_for_results(...)
        @param recorder: mocked recorder object to replay status messages
        """
        def result_generator(results):
            """A simple generator which generates results as Status objects.

            This generator handles 'send' by simply ignoring it.

            @param results: results object to be returned from
                            job_stats_wait_for_results(...)
            @yield: job_status.Status objects.
            """
            results = map(lambda r: job_status.Status(*r), results)
            for r in results:
                new_input = (yield r)
                if new_input:
                    yield None

        self.mox.StubOutWithMock(suite, 'schedule')
        suite.schedule(recorder.record_entry)
        suite._retry_handler = RetryHandler({})

        waiter_patch = mock.patch.object(
                job_status.JobResultWaiter, 'wait_for_results', autospec=True)
        waiter_mock = waiter_patch.start()
        waiter_mock.return_value = result_generator(results)
        self.addCleanup(waiter_patch.stop)


    def testRunAndWaitSuccess(self):
        """Should record successful results."""
        suite = self._createSuiteWithMockedTestsAndControlFiles()

        recorder = self.mox.CreateMock(base_job.base_job)

        results = [('GOOD', 'good'), ('FAIL', 'bad', 'reason')]
        self._mock_recorder_with_results(results, recorder)
        self.schedule_and_expect_these_results(suite, results, recorder)
        self.mox.ReplayAll()

        suite.schedule(recorder.record_entry)
        suite.wait(recorder.record_entry)


    def testRunAndWaitFailure(self):
        """Should record failure to gather results."""
        suite = self._createSuiteWithMockedTestsAndControlFiles()

        recorder = self.mox.CreateMock(base_job.base_job)
        recorder.record_entry(
            StatusContains.CreateFromStrings('FAIL', self._TAG, 'waiting'),
            log_in_subdir=False)

        self.mox.StubOutWithMock(suite, 'schedule')
        suite.schedule(recorder.record_entry)
        self.mox.ReplayAll()

        with mock.patch.object(
                job_status.JobResultWaiter, 'wait_for_results',
                autospec=True) as wait_mock:
            wait_mock.side_effect = Exception
            suite.schedule(recorder.record_entry)
            suite.wait(recorder.record_entry)


    def testRunAndWaitScheduleFailure(self):
        """Should record failure to schedule jobs."""
        suite = self._createSuiteWithMockedTestsAndControlFiles()

        recorder = self.mox.CreateMock(base_job.base_job)
        recorder.record_entry(
            StatusContains.CreateFromStrings('INFO', 'Start %s' % self._TAG),
            log_in_subdir=False)

        recorder.record_entry(
            StatusContains.CreateFromStrings('FAIL', self._TAG, 'scheduling'),
            log_in_subdir=False)

        self.mox.StubOutWithMock(suite._job_creator, 'create_job')
        suite._job_creator.create_job(
            mox.IgnoreArg(), retry_for=mox.IgnoreArg()).AndRaise(
            Exception('Expected during test.'))
        self.mox.ReplayAll()

        suite.schedule(recorder.record_entry)
        suite.wait(recorder.record_entry)


    def testGetTestsSortedByTime(self):
        """Should find all tests and sorted by TIME setting."""
        self.expect_control_file_parsing()
        self.mox.ReplayAll()
        # Get all tests.
        tests = SuiteBase.find_and_parse_tests(self.getter,
                                               lambda d: True,
                                               self._TAG)
        self.assertEquals(len(tests), 7)
        times = [control_data.ControlData.get_test_time_index(test.time)
                 for test in tests]
        self.assertTrue(all(x>=y for x, y in zip(times, times[1:])),
                        'Tests are not ordered correctly.')


    def _get_bad_test_report(self, result_status='FAIL'):
        """
        Fetch the predicates of a failing test, and the parameters
        that are a fallout of this test failing.
        """
        predicates = collections.namedtuple('predicates',
                                            'status, testname, reason')
        fallout = collections.namedtuple('fallout',
                                         ('time_start, time_end, job_id,'
                                          'username, hostname'))
        test_report = collections.namedtuple('test_report',
                                             'predicates, fallout')
        return test_report(predicates(result_status, 'bad_test',
                                      'dreadful_reason'),
                           fallout('2014-01-01 01:01:01', 'None',
                                   self._FAKE_JOB_ID, 'user', 'myhost'))


    def testJobRetryTestFail(self):
        """Test retry works."""
        test_to_retry = self.files['seven']
        fake_new_job_id = self._FAKE_JOB_ID + 1
        fake_job = FakeJob(id=self._FAKE_JOB_ID)
        fake_new_job = FakeJob(id=fake_new_job_id)

        test_results = self._createSuiteMockResults()
        self.schedule_and_expect_these_results(
                self.suite,
                [test_results[0] + test_results[1]],
                self.recorder)
        self.mox.StubOutWithMock(self.suite._job_creator, 'create_job')
        self.suite._job_creator.create_job(
                test_to_retry,
                retry_for=self._FAKE_JOB_ID).AndReturn(fake_new_job)
        self.mox.ReplayAll()
        self.suite.schedule(self.recorder.record_entry)
        self.suite._retry_handler._retry_map = {
                self._FAKE_JOB_ID: {'state': RetryHandler.States.NOT_ATTEMPTED,
                                    'retry_max': 1}
                }
        self.suite._jobs_to_tests[self._FAKE_JOB_ID] = test_to_retry
        self.suite.wait(self.recorder.record_entry)
        expected_retry_map = {
                self._FAKE_JOB_ID: {'state': RetryHandler.States.RETRIED,
                                    'retry_max': 1},
                fake_new_job_id: {'state': RetryHandler.States.NOT_ATTEMPTED,
                                  'retry_max': 0}
                }
        # Check retry map is correctly updated
        self.assertEquals(self.suite._retry_handler._retry_map,
                          expected_retry_map)
        # Check _jobs_to_tests is correctly updated
        self.assertEquals(self.suite._jobs_to_tests[fake_new_job_id],
                          test_to_retry)


    def testJobRetryTestWarn(self):
        """Test that no retry is scheduled if test warns."""
        test_to_retry = self.files['seven']
        fake_job = FakeJob(id=self._FAKE_JOB_ID)
        test_results = self._createSuiteMockResults(result_status='WARN')
        self.schedule_and_expect_these_results(
                self.suite,
                [test_results[0] + test_results[1]],
                self.recorder)
        self.mox.ReplayAll()
        self.suite.schedule(self.recorder.record_entry)
        self.suite._retry_handler._retry_map = {
                self._FAKE_JOB_ID: {'state': RetryHandler.States.NOT_ATTEMPTED,
                                    'retry_max': 1}
                }
        self.suite._jobs_to_tests[self._FAKE_JOB_ID] = test_to_retry
        expected_jobs_to_tests = self.suite._jobs_to_tests.copy()
        expected_retry_map = self.suite._retry_handler._retry_map.copy()
        self.suite.wait(self.recorder.record_entry)
        self.assertTrue(self.result_reporter.results)
        # Check retry map and _jobs_to_tests, ensure no retry was scheduled.
        self.assertEquals(self.suite._retry_handler._retry_map,
                          expected_retry_map)
        self.assertEquals(self.suite._jobs_to_tests, expected_jobs_to_tests)


    def testFailedJobRetry(self):
        """Make sure the suite survives even if the retry failed."""
        test_to_retry = self.files['seven']
        fake_job = FakeJob(id=self._FAKE_JOB_ID)

        test_results = self._createSuiteMockResults()
        self.schedule_and_expect_these_results(
                self.suite,
                [test_results[0] + test_results[1]],
                self.recorder)
        self.mox.StubOutWithMock(self.suite._job_creator, 'create_job')
        self.suite._job_creator.create_job(
                test_to_retry, retry_for=self._FAKE_JOB_ID).AndRaise(
                error.RPCException('Expected during test'))
        # Do not file a bug.
        self.mox.StubOutWithMock(self.suite, '_should_report')
        self.suite._should_report(mox.IgnoreArg()).AndReturn(False)

        self.mox.ReplayAll()

        self.suite.schedule(self.recorder.record_entry)
        self.suite._retry_handler._retry_map = {
                self._FAKE_JOB_ID: {
                        'state': RetryHandler.States.NOT_ATTEMPTED,
                        'retry_max': 1}}
        self.suite._jobs_to_tests[self._FAKE_JOB_ID] = test_to_retry
        self.suite.wait(self.recorder.record_entry)
        expected_retry_map = {
                self._FAKE_JOB_ID: {
                        'state': RetryHandler.States.ATTEMPTED,
                        'retry_max': 1}}
        expected_jobs_to_tests = self.suite._jobs_to_tests.copy()
        self.assertEquals(self.suite._retry_handler._retry_map,
                          expected_retry_map)
        self.assertEquals(self.suite._jobs_to_tests, expected_jobs_to_tests)


class _MemoryResultReporter(SuiteBase._ResultReporter):
    """Reporter that stores results internally for testing."""
    def __init__(self):
        self.results = []

    def report(self, result):
        """Reports the result by storing it internally."""
        self.results.append(result)


if __name__ == '__main__':
    unittest.main()