普通文本  |  184行  |  5.92 KB

#!/usr/bin/env python

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import task_loop

import glib
import logging
import mox
import time
import unittest

class TaskLoopTestCase(unittest.TestCase):
    """
    Test fixture for TaskLoop class.

    These unit-tests have a trade-off between speed and stability. The whole
    suite takes about half a minute to run, and probably could be speeded up a
    little bit. But, making the numbers too small might make the tests flaky.
    """

    def setUp(self):
        self._mox = mox.Mox()
        self._callback_mocker = self._mox.CreateMock(TaskLoopTestCase)
        self._task_loop = task_loop.get_instance()

    # ##########################################################################
    # Tests

    def test_post_task_simple(self):
        """Post a simple task and expect it gets dispatched."""
        self._callback_mocker._callback()

        self._mox.ReplayAll()
        self._task_loop.post_task(self._callback_mocker._callback)
        self._run_task_loop(2)
        self._mox.VerifyAll()


    def test_post_task_set_attribute(self):
        """Post a task that accesses an attribute from the context object."""
        self.flag = False
        self._task_loop.post_task(self._callback_set_attribute)
        self._run_task_loop(2)
        self.assertTrue(self.flag)


    def test_post_task_with_argument(self):
        """Post task with some argument."""
        arg = True
        self._callback_mocker._callback_with_arguments(arg)

        self._mox.ReplayAll()
        self._task_loop.post_task(
                self._callback_mocker._callback_with_arguments, arg)
        self._run_task_loop(2)
        self._mox.VerifyAll()


    def test_post_task_after_delay(self):
        """Post a task with some delay and check that the delay is respected."""
        start_time = time.time()
        self.time = start_time
        self._task_loop.post_task_after_delay(self._callback_set_time, 3000)
        self._run_task_loop(5)
        delayed_time = self.time - start_time
        self.assertGreaterEqual(delayed_time, 3)


    def test_post_repeated_task(self):
        """Post a repeated task and check it gets dispatched multiple times."""
        self.count = 0
        self._task_loop.post_repeated_task(self._callback_increment_count, 1000)
        self._run_task_loop(5)
        self.assertGreaterEqual(self.count, 3)


    def test_ignore_delays(self):
        """Post a task and test ignore_delays mode."""
        self._task_loop.ignore_delays = False

        self._task_loop.post_task_after_delay(self._callback_mocker._callback,
                                              10000)
        # Not enough time to dispatch the posted task
        self._run_task_loop(1)
        self._mox.VerifyAll()


    def test_cancel_posted_task(self):
        """Test that a cancelled task is not dispatched."""
        post_id = self._task_loop.post_task_after_delay(
                self._callback_mocker._callback,
                2000)
        self._task_loop.post_task(self._callback_cancel_task, post_id)
        self._run_task_loop(3)
        self._mox.VerifyAll()


    def test_multiple_cancels(self):
        """Test that successive cancels after a successful cancel fail."""
        post_id = self._task_loop.post_task_after_delay(
                self._callback_mocker._callback,
                2000)
        self._task_loop.post_task(self._callback_cancel_task, post_id)
        self._task_loop.post_task(self._callback_cancel_cancelled_task, post_id)
        self._run_task_loop(3)
        self._mox.VerifyAll()


    def test_random_delays(self):
        """Test that random delays works (sort of). This test could be flaky."""
        # Warning: This test could be flaky. Add more differences?
        self.count = 0
        self.times = {}
        self._task_loop.random_delays = True
        self._task_loop.max_random_delay_ms = 1000
        self._task_loop.post_repeated_task(self._callback_record_times, 500)
        self._run_task_loop(5)
        self.assertGreaterEqual(self.count, 4)
        # Test that not all time gaps are almost the same
        diff1 = round(self.times[1] - self.times[0], 3)
        diff2 = round(self.times[2] - self.times[1], 3)
        diff3 = round(self.times[3] - self.times[2], 3)
        self.assertTrue(diff1 != diff2 or diff2 != diff3 or diff3 != diff1)

    # ##########################################################################
    # Helper functions

    def _stop_task_loop(self):
        print('Stopping task_loop.')
        self._task_loop.stop()

    def _run_task_loop(self, run_for_seconds):
        """
        Runs the task loop for |run_for_seconds| seconds. This function is
        blocking, so the main thread will return only after |run_for_seconds|.
        """
        # post a task to stop the task loop.
        glib.timeout_add(run_for_seconds*1000, self._stop_task_loop)
        self._task_loop.start()
        # We will continue only when the stop task has been executed.

    # ##########################################################################
    # Callbacks for tests

    def _callback(self):
        print('Actual TaskLoopTestCase._callback called!')


    def _callback_set_attribute(self):
        self.flag = True


    def _callback_set_time(self):
        self.time = time.time()


    def _callback_increment_count(self):
        self.count = self.count + 1


    def _callback_record_times(self):
        self.times[self.count] = time.time()
        self.count = self.count + 1


    def _callback_with_arguments(self, arg):
        pass


    def _callback_cancel_task(self, post_id):
        self._task_loop.cancel_posted_task(post_id)


    def _callback_cancel_cancelled_task(self, post_id):
        self.assertFalse(self._task_loop.cancel_posted_task(post_id))


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    unittest.main()