#!/usr/bin/python import logging, os, select, StringIO, subprocess, sys, unittest import common from autotest_lib.client.common_lib import logging_manager, logging_config class PipedStringIO(object): """ Like StringIO, but all I/O passes through a pipe. This means a PipedStringIO is backed by a file descriptor is thus can do things like pass down to a subprocess. However, this means the creating process must call read_pipe() (or the classmethod read_all_pipes()) periodically to read the pipe, and must call close() (or the classmethod cleanup()) to close the pipe. """ _instances = set() def __init__(self): self._string_io = StringIO.StringIO() self._read_end, self._write_end = os.pipe() PipedStringIO._instances.add(self) def close(self): self._string_io.close() os.close(self._read_end) os.close(self._write_end) PipedStringIO._instances.remove(self) def write(self, data): os.write(self._write_end, data) def flush(self): pass def fileno(self): return self._write_end def getvalue(self): self.read_pipe() return self._string_io.getvalue() def read_pipe(self): while True: read_list, _, _ = select.select([self._read_end], [], [], 0) if not read_list: return self._string_io.write(os.read(self._read_end, 1024)) @classmethod def read_all_pipes(cls): for instance in cls._instances: instance.read_pipe() @classmethod def cleanup_all_instances(cls): for instance in list(cls._instances): instance.close() LOGGING_FORMAT = '%(levelname)s: %(message)s' _EXPECTED_STDOUT = """\ print 1 system 1 INFO: logging 1 INFO: print 2 INFO: system 2 INFO: logging 2 INFO: print 6 INFO: system 6 INFO: logging 6 print 7 system 7 INFO: logging 7 """ _EXPECTED_LOG1 = """\ INFO: print 3 INFO: system 3 INFO: logging 3 INFO: print 4 INFO: system 4 INFO: logging 4 INFO: print 5 INFO: system 5 INFO: logging 5 """ _EXPECTED_LOG2 = """\ INFO: print 4 INFO: system 4 INFO: logging 4 """ class DummyLoggingConfig(logging_config.LoggingConfig): console_formatter = logging.Formatter(LOGGING_FORMAT) def __init__(self): super(DummyLoggingConfig, self).__init__() self.log = PipedStringIO() def add_debug_file_handlers(self, log_dir, log_name=None): self.logger.addHandler(logging.StreamHandler(self.log)) # this isn't really a unit test since it creates subprocesses and pipes and all # that. i just use the unittest framework because it's convenient. class LoggingManagerTest(unittest.TestCase): def setUp(self): self.stdout = PipedStringIO() self._log1 = PipedStringIO() self._log2 = PipedStringIO() self._real_system_calls = False # the LoggingManager will change self.stdout (by design), so keep a # copy around self._original_stdout = self.stdout # clear out import-time logging config and reconfigure root_logger = logging.getLogger() for handler in list(root_logger.handlers): root_logger.removeHandler(handler) # use INFO to ignore debug output from logging_manager itself logging.basicConfig(level=logging.INFO, format=LOGGING_FORMAT, stream=self.stdout) self._config_object = DummyLoggingConfig() logging_manager.LoggingManager.logging_config_object = ( self._config_object) def tearDown(self): PipedStringIO.cleanup_all_instances() def _say(self, suffix): print >>self.stdout, 'print %s' % suffix if self._real_system_calls: os.system('echo system %s >&%s' % (suffix, self._original_stdout.fileno())) else: print >>self.stdout, 'system %s' % suffix logging.info('logging %s', suffix) PipedStringIO.read_all_pipes() def _setup_manager(self, manager_class=logging_manager.LoggingManager): def set_stdout(file_object): self.stdout = file_object manager = manager_class() manager.manage_stream(self.stdout, logging.INFO, set_stdout) return manager def _run_test(self, manager_class): manager = self._setup_manager(manager_class) self._say(1) manager.start_logging() self._say(2) manager.redirect_to_stream(self._log1) self._say(3) manager.tee_redirect_to_stream(self._log2) self._say(4) manager.undo_redirect() self._say(5) manager.undo_redirect() self._say(6) manager.stop_logging() self._say(7) def _grab_fd_info(self): command = 'ls -l /proc/%s/fd' % os.getpid() proc = subprocess.Popen(command.split(), shell=True, stdout=subprocess.PIPE) return proc.communicate()[0] def _compare_logs(self, log_buffer, expected_text): actual_lines = log_buffer.getvalue().splitlines() expected_lines = expected_text.splitlines() if self._real_system_calls: # because of the many interacting processes, we can't ensure perfect # interleaving. so compare sets of lines rather than ordered lines. actual_lines = set(actual_lines) expected_lines = set(expected_lines) self.assertEquals(actual_lines, expected_lines) def _check_results(self): # ensure our stdout was restored self.assertEquals(self.stdout, self._original_stdout) if self._real_system_calls: # ensure FDs were left in their original state self.assertEquals(self._grab_fd_info(), self._fd_info) self._compare_logs(self.stdout, _EXPECTED_STDOUT) self._compare_logs(self._log1, _EXPECTED_LOG1) self._compare_logs(self._log2, _EXPECTED_LOG2) def test_logging_manager(self): self._run_test(logging_manager.LoggingManager) self._check_results() def test_fd_redirection_logging_manager(self): self._real_system_calls = True self._fd_info = self._grab_fd_info() self._run_test(logging_manager.FdRedirectionLoggingManager) self._check_results() def test_tee_redirect_debug_dir(self): manager = self._setup_manager() manager.start_logging() manager.tee_redirect_debug_dir('/fake/dir', tag='mytag') print >>self.stdout, 'hello' manager.undo_redirect() print >>self.stdout, 'goodbye' manager.stop_logging() self._compare_logs(self.stdout, 'INFO: mytag : hello\nINFO: goodbye') self._compare_logs(self._config_object.log, 'hello\n') class MonkeyPatchTestCase(unittest.TestCase): def setUp(self): filename = os.path.split(__file__)[1] if filename.endswith('.pyc'): filename = filename[:-1] self.expected_filename = filename def check_filename(self, filename, expected=None): if expected is None: expected = [self.expected_filename] self.assertIn(os.path.split(filename)[1], expected) def _0_test_find_caller(self): finder = logging_manager._logging_manager_aware_logger__find_caller filename, lineno, caller_name = finder(logging_manager.logger) self.check_filename(filename) self.assertEquals('test_find_caller', caller_name) def _1_test_find_caller(self): self._0_test_find_caller() def test_find_caller(self): self._1_test_find_caller() def _0_test_non_reported_find_caller(self): finder = logging_manager._logging_manager_aware_logger__find_caller filename, lineno, caller_name = finder(logging_manager.logger) # Python 2.4 unittest implementation will call the unittest method in # file 'unittest.py', and Python >= 2.6 does the same in 'case.py' self.check_filename(filename, expected=['unittest.py', 'case.py']) def _1_test_non_reported_find_caller(self): self._0_test_non_reported_find_caller() @logging_manager.do_not_report_as_logging_caller def test_non_reported_find_caller(self): self._1_test_non_reported_find_caller() if __name__ == '__main__': unittest.main()