#!/usr/bin/env python # # Copyright (C) 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the 'License'); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an 'AS IS' BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import logging import os from vts.runners.host import asserts from vts.runners.host import base_test from vts.runners.host import const from vts.runners.host import keys from vts.runners.host import test_runner from vts.utils.python.controllers import adb from vts.utils.python.controllers import android_device from vts.utils.python.common import list_utils from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase class LibFuzzerTest(base_test.BaseTestClass): """Runs LLVM libfuzzer tests on target. Attributes: _dut: AndroidDevice, the device under test as config. """ def setUpClass(self): """Creates a remote shell instance, and copies data files.""" required_params = [ keys.ConfigKeys.IKEY_DATA_FILE_PATH, keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, ] self.getUserParams(required_params) logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH, self.data_file_path) logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE, self.binary_test_source) self._dut = self.registerController(android_device, False)[0] self._dut.stop() self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR) def tearDownClass(self): """Deletes all copied data.""" self._dut.adb.shell('rm -rf %s' % config.FUZZER_TEST_DIR) self._dut.start() def PushFiles(self, src): """adb pushes test case file to target.""" push_src = os.path.join(self.data_file_path, src) push_dst = config.FUZZER_TEST_DIR self._dut.adb.push('%s %s' % (push_src, push_dst), no_except=True) logging.info('Adb pushed: %s \nto: %s', push_src, push_dst) return push_dst def CreateTestCases(self): """Creates LibFuzzerTestCase instances. Returns: LibFuzzerTestCase list. """ test_cases = map( lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}), self.binary_test_source) return test_cases # TODO: retrieve the corpus. def CreateCorpusDir(self, test_case): """Creates corpus directory on the target.""" corpus_dir = test_case.GetCorpusName() self._dut.adb.shell('mkdir %s -p' % corpus_dir) def RunTestcase(self, test_case): """Runs the given test case and asserts the result. Args: test_case: LibFuzzerTestCase object """ self.PushFiles(test_case.bin_host_path) self.CreateCorpusDir(test_case) fuzz_cmd = '"%s"' % test_case.GetRunCommand() result = self._dut.adb.shell(fuzz_cmd, no_except=True) # TODO: upload the corpus and, possibly, crash log. self.AssertTestResult(test_case, result) def LogCrashReport(self, test_case): """Logs crash-causing fuzzer input. Reads the crash report file and logs the contents in format: '\x01\x23\x45\x67\x89\xab\xcd\xef' Args: test_case: LibFuzzerTestCase object """ touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT self._dut.adb.shell(touch_cmd) # output is string of a hexdump from crash report file. # From the example above, output would be '0123456789abcdef'. xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT output = self._dut.adb.shell(xxd_cmd) remove_chars = ['\r', '\t', '\n', ' '] for char in remove_chars: output = output.replace(char, '') crash_report = '' # output is guaranteed to be even in length since its a hexdump. for offset in xrange(0, len(output), 2): crash_report += '\\x%s' % output[offset:offset + 2] logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"', test_case.test_name, crash_report) # TODO(trong): differentiate between crashes and sanitizer rule violations. def AssertTestResult(self, test_case, result): """Asserts that test case finished as expected. Checks that device is in responsive state. If not, waits for boot then reports test as failure. If it is, asserts that all test commands returned exit code 0. Args: test_case: LibFuzzerTestCase object result: dict(str, str, int), command results from shell. """ logging.info('Test case results.') logging.info('stdout: %s' % result[const.STDOUT]) logging.info('stderr: %s' % result[const.STDERR]) logging.info('exit code: %s' % result[const.EXIT_CODE]) if not self._dut.hasBooted(): self._dut.waitForBootCompletion() asserts.fail('%s left the device in unresponsive state.' % test_case.test_name) exit_code = result[const.EXIT_CODE] if exit_code == config.ExitCode.FUZZER_TEST_FAIL: self.LogCrashReport(test_case) asserts.fail('%s failed normally.' % test_case.test_name) elif exit_code != config.ExitCode.FUZZER_TEST_PASS: asserts.fail('%s failed abnormally.' % test_case.test_name) def generateFuzzerTests(self): """Runs fuzzer tests.""" self.runGeneratedTests( test_func=self.RunTestcase, settings=self.CreateTestCases(), name_func=lambda x: x.test_name) if __name__ == '__main__': test_runner.main()