普通文本  |  298行  |  10.05 KB

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging
import unittest
import re
import csv
import common
import os

from itertools import imap
from autotest_lib.server.cros import resource_monitor
from autotest_lib.server.hosts import abstract_ssh
from autotest_lib.server import utils

class HostMock(abstract_ssh.AbstractSSHHost):
    """Mocks a host object."""

    TOP_PID = '1234'

    def _initialize(self, test_env):
        self.top_is_running = False

        # Keep track of whether the top raw output file exists on the system,
        # and if it does, where it is.
        self.top_output_file_path = None

        # Keep track of whether the raw top output file is currently being
        # written to by top.
        self.top_output_file_is_open = False
        self.test_env = test_env


    def get_file(self, src, dest):
        pass


    def called_unsupported_command(self, command):
        """Raises assertion error when called.

        @param command string the unsupported command called.

        """
        raise AssertionError(
                "ResourceMonitor called unsupported command %s" % command)


    def _process_top(self, cmd_args, cmd_line):
        """Process top command.

        @param cmd_args string_list of command line args.
        @param cmd_line string the command to run.

        """
        self.test_env.assertFalse(self.top_is_running,
                msg="Top must not already be running.")
        self.test_env.assertFalse(self.top_output_file_is_open,
                msg="The top output file should not be being written "
                "to before top is started")
        self.test_env.assertIsNone(self.top_output_file_path,
                msg="The top output file should not exist"
                "before top is started")
        try:
            self.redirect_index = cmd_args.index(">")
            self.top_output_file_path = cmd_args[self.redirect_index + 1]
        except ValueError, IndexError:
            self.called_unsupported_command(cmd_line)

        self.top_is_running = True
        self.top_output_file_is_open = True

        return HostMock.TOP_PID


    def _process_kill(self, cmd_args, cmd_line):
        """Process kill command.

        @param cmd_args string_list of command line args.
        @param cmd_line string the command to run.

        """
        try:
            if cmd_args[1].startswith('-'):
                pid_to_kill = cmd_args[2]
            else:
                pid_to_kill = cmd_args[1]
        except IndexError:
            self.called_unsupported_command(cmd_line)

        self.test_env.assertEqual(pid_to_kill, HostMock.TOP_PID,
                msg="Wrong pid (%r) killed . Top pid is %r." % (pid_to_kill,
                HostMock.TOP_PID))
        self.test_env.assertTrue(self.top_is_running,
                msg="Top must be running before we try to kill it")

        self.top_is_running = False
        self.top_output_file_is_open = False


    def _process_rm(self, cmd_args, cmd_line):
        """Process rm command.

        @param cmd_args string list list of command line args.
        @param cmd_line string the command to run.

        """
        try:
            if cmd_args[1].startswith('-'):
                file_to_rm = cmd_args[2]
            else:
                file_to_rm = cmd_args[1]
        except IndexError:
            self.called_unsupported_command(cmd_line)

        self.test_env.assertEqual(file_to_rm, self.top_output_file_path,
                msg="Tried to remove file that is not the top output file.")
        self.test_env.assertFalse(self.top_output_file_is_open,
                msg="Tried to remove top output file while top is still "
                "writing to it.")
        self.test_env.assertFalse(self.top_is_running,
                msg="Top was still running when we tried to remove"
                "the top output file.")
        self.test_env.assertIsNotNone(self.top_output_file_path)

        self.top_output_file_path = None


    def _run_single_cmd(self, cmd_line, *args, **kwargs):
        """Run a single command on host.

        @param cmd_line command to run on the host.

        """
        # Make the input a little nicer.
        cmd_line = cmd_line.strip()
        cmd_line = re.sub(">", " > ", cmd_line)

        cmd_args = re.split("\s+", cmd_line)
        self.test_env.assertTrue(len(cmd_args) >= 1)
        command = cmd_args[0]
        if (command == "top"):
            return self._process_top(cmd_args, cmd_line)
        elif (command == "kill"):
            return self._process_kill(cmd_args, cmd_line)
        elif(command == "rm"):
            return self._process_rm(cmd_args, cmd_line)
        else:
            logging.warning("Called unemulated command %r", cmd_line)
            return None


    def run(self, cmd_line, *args, **kwargs):
        """Run command(s) on host.

        @param cmd_line command to run on the host.
        @return CmdResult object.

        """
        cmds = re.split("&&", cmd_line)
        for cmd in cmds:
            self._run_single_cmd(cmd)
        return utils.CmdResult(exit_status=0)


    def run_background(self, cmd_line, *args, **kwargs):
        """Run command in background on host.

        @param cmd_line command to run on the host.

        """
        return self._run_single_cmd(cmd_line, args, kwargs)


    def is_monitoring(self):
        """Return true iff host is currently running top and writing output
            to a file.
        """
        return self.top_is_running and self.top_output_file_is_open and (
            self.top_output_file_path is not None)


    def monitoring_stopped(self):
        """Return true iff host is not running top and all top output files are
            closed.
        """
        return not self.is_monitoring()


class ResourceMonitorTest(unittest.TestCase):
    """Tests the non-trivial functionality of ResourceMonitor."""

    def setUp(self):
        self.topoutfile = '/tmp/resourcemonitorunittest-1234'
        self.monitor_period = 1
        self.rm_conf = resource_monitor.ResourceMonitorConfig(
                monitor_period=self.monitor_period,
                rawresult_output_filename=self.topoutfile)
        self.host = HostMock(self)


    def test_normal_operation(self):
        """Checks that normal (i.e. no exceptions, etc.) execution works."""
        self.assertFalse(self.host.is_monitoring())
        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
            self.assertFalse(self.host.is_monitoring())
            for i in range(3):
                rm.start()
                self.assertTrue(self.host.is_monitoring())
                rm.stop()
                self.assertTrue(self.host.monitoring_stopped())
        self.assertTrue(self.host.monitoring_stopped())


    def test_forgot_to_stop_monitor(self):
        """Checks that resource monitor is cleaned up even if user forgets to
            explicitly stop it.
        """
        self.assertFalse(self.host.is_monitoring())
        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
            self.assertFalse(self.host.is_monitoring())
            rm.start()
            self.assertTrue(self.host.is_monitoring())
        self.assertTrue(self.host.monitoring_stopped())


    def test_unexpected_interruption_while_monitoring(self):
        """Checks that monitor is cleaned up upon unexpected interrupt."""
        self.assertFalse(self.host.is_monitoring())

        with resource_monitor.ResourceMonitor(self.host, self.rm_conf) as rm:
            self.assertFalse(self.host.is_monitoring())
            rm.start()
            self.assertTrue(self.host.is_monitoring())
            raise KeyboardInterrupt

        self.assertTrue(self.host.monitoring_stopped())


class ResourceMonitorResultTest(unittest.TestCase):
    """Functional tests for ResourceMonitorParsedResult."""

    def setUp(self):
        self._res_dir = os.path.join(
                            os.path.dirname(os.path.realpath(__file__)),
                            'res_resource_monitor')


    def run_with_test_data(self, testdata_file, testans_file):
        """Parses testdata_file with the parses, and checks that results
            are the same as those in testans_file.

        @param testdata_file string filename containing top output to test.
        @param testans_file string filename containing answers to the test.

        """
        parsed_results = resource_monitor.ResourceMonitorParsedResult(
                testdata_file)
        with open(testans_file, "rb") as testans:
            csvreader = csv.reader(testans)
            columns = csvreader.next()
            self.assertEqual(list(columns),
                    resource_monitor.ResourceMonitorParsedResult._columns)
            utils_over_time = []
            for util_val in imap(
                    resource_monitor.
                            ResourceMonitorParsedResult.UtilValues._make,
                    csvreader):
                utils_over_time.append(util_val)
            self.assertEqual(utils_over_time, parsed_results._utils_over_time)


    def test_full_data(self):
        """General test with many possible changes to input."""
        self.run_with_test_data(
                os.path.join(self._res_dir, 'top_test_data.txt'),
                os.path.join(self._res_dir, 'top_test_data_ans.csv'))


    def test_whitespace_ridden(self):
        """Tests resilience to arbitrary whitespace characters between fields"""
        self.run_with_test_data(
                os.path.join(self._res_dir, 'top_whitespace_ridden.txt'),
                os.path.join(self._res_dir, 'top_whitespace_ridden_ans.csv'))


    def test_field_order_changed(self):
        """Tests resilience to changes in the order of fields
            (for e.g, if the Mem free/used fields change orders in the input).
        """
        self.run_with_test_data(
                os.path.join(self._res_dir, 'top_field_order_changed.txt'),
                os.path.join(self._res_dir, 'top_field_order_changed_ans.csv'))


if __name__ == '__main__':
    unittest.main()