普通文本  |  161行  |  5.61 KB

#!/usr/bin/python
#
# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import datetime, unittest

import mox

import common

# We want to import setup_django_lite_environment first so that the database
# is setup correctly.
from autotest_lib.frontend import setup_django_lite_environment
from autotest_lib.client.common_lib import utils
from autotest_lib.frontend.afe import models, rpc_interface
from django import test
from autotest_lib.server.cros import repair_utils


# See complete_failures_functional_tests.py for why we need this.
class MockDatetime(datetime.datetime):
    """Used to mock out parts of datetime.datetime."""
    pass


# We use a mock rpc client object so that we instead directly use the server.
class MockAFE():
    """Used to mock out the rpc client."""
    def run(self, func, **kwargs):
        """
        The fake run call directly contacts the server.

        @param func: The name of the remote function that is being called.

        @param kwargs: The arguments to the remotely called function.
        """
        return utils.strip_unicode(getattr(rpc_interface, func)(**kwargs))


class FindProblemTestTests(mox.MoxTestBase, test.TestCase):
    """Test that we properly find the last ran job."""


    def setUp(self):
        super(FindProblemTestTests, self).setUp()
        self.mox.StubOutWithMock(MockDatetime, 'today')

        self.datetime = datetime.datetime
        datetime.datetime = MockDatetime
        self._orig_cutoff = repair_utils._CUTOFF_AFTER_TIMEOUT_MINS
        self._orig_timeout = repair_utils._DEFAULT_TEST_TIMEOUT_MINS


    def tearDown(self):
        repair_utils._DEFAULT_TEST_TIMEOUT_MINS = self._orig_timeout
        repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = self._orig_cutoff
        datetime.datetime = self.datetime
        super(FindProblemTestTests, self).tearDown()


    def test_should_get_most_recent_job(self):
        """Test that, for a given host, we get the last job ran on that host."""

        host = models.Host(hostname='host')
        host.save()

        old_job = models.Job(owner='me', name='old_job',
                             created_on=datetime.datetime(2012, 1, 1))
        old_job.save()
        old_host_queue_entry = models.HostQueueEntry(
                job=old_job, host=host, status='test',
                started_on=datetime.datetime(2012, 1, 1, 1))
        old_host_queue_entry.save()

        new_job = models.Job(owner='me', name='new_job',
                             created_on=datetime.datetime(2012, 1, 1))
        new_job.save()
        new_host_queue_entry = models.HostQueueEntry(
                job=new_job, host=host, status='test',
                started_on=datetime.datetime(2012, 1, 1, 2))
        new_host_queue_entry.save()

        mock_rpc = MockAFE()
        datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1))
        repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440

        repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60

        self.mox.ReplayAll()
        result = repair_utils._find_problem_test('host', mock_rpc)

        self.assertEqual(result['job']['name'], 'new_job')


    def test_should_get_job_for_specified_host_only(self):
        """Test that we only get a job that is for the given host."""

        correct_job = models.Job(owner='me', name='correct_job',
                                 created_on=datetime.datetime(2012, 1, 1))
        correct_job.save()
        correct_host = models.Host(hostname='correct_host')
        correct_host.save()
        correct_host_queue_entry = models.HostQueueEntry(
            job=correct_job, host=correct_host, status='test',
            started_on=datetime.datetime(2012, 1, 1, 1))
        correct_host_queue_entry.save()

        wrong_job = models.Job(owner='me', name='wrong_job',
                               created_on=datetime.datetime(2012, 1, 1))
        wrong_job.save()
        wrong_host = models.Host(hostname='wrong_host')
        wrong_host.save()
        wrong_host_queue_entry = models.HostQueueEntry(
                job=wrong_job, host=wrong_host, status='test',
                started_on=datetime.datetime(2012, 1, 1, 2))
        wrong_host_queue_entry.save()

        mock_rpc = MockAFE()
        datetime.datetime.today().AndReturn(datetime.datetime(2012,1,1))
        repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440

        repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60

        self.mox.ReplayAll()
        result = repair_utils._find_problem_test('correct_host', mock_rpc)

        self.assertEqual(result['job']['name'], 'correct_job')


    def test_return_jobs_ran_soon_after_max_job_runtime(self):
        """Test that we get jobs that are just past the max runtime."""

        host = models.Host(hostname='host')
        host.save()

        new_job = models.Job(owner='me', name='new_job',
                             created_on=datetime.datetime(2012, 1, 1, 0, 0))
        new_job.save()
        new_host_queue_entry = models.HostQueueEntry(
                job=new_job, host=host, status='test',
                started_on=datetime.datetime(2012, 1, 1, 2))
        new_host_queue_entry.save()

        mock_rpc = MockAFE()
        datetime.datetime.today().AndReturn(datetime.datetime(2012, 1, 2, 0,
                                                              30))
        repair_utils._DEFAULT_TEST_TIMEOUT_MINS = 1440

        repair_utils._CUTOFF_AFTER_TIMEOUT_MINS = 60

        self.mox.ReplayAll()
        result = repair_utils._find_problem_test('host', mock_rpc)

        self.assertEqual(result['job']['name'], 'new_job')


if __name__ == '__main__':
    unittest.main()