普通文本  |  168行  |  6.07 KB

#!/usr/bin/env python
#
# Copyright (C) 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import os

from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import const
from vts.runners.host import keys
from vts.runners.host import test_runner
from vts.utils.python.controllers import adb
from vts.utils.python.controllers import android_device
from vts.utils.python.common import list_utils

from vts.testcases.fuzz.template.libfuzzer_test import libfuzzer_test_config as config
from vts.testcases.fuzz.template.libfuzzer_test.libfuzzer_test_case import LibFuzzerTestCase


class LibFuzzerTest(base_test.BaseTestClass):
    """Runs LLVM libfuzzer tests on target.

    Attributes:
        _dut: AndroidDevice, the device under test as config.
    """

    def setUpClass(self):
        """Creates a remote shell instance, and copies data files."""
        required_params = [
            keys.ConfigKeys.IKEY_DATA_FILE_PATH,
            keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
        ]
        self.getUserParams(required_params)

        logging.info('%s: %s', keys.ConfigKeys.IKEY_DATA_FILE_PATH,
                     self.data_file_path)
        logging.info('%s: %s', keys.ConfigKeys.IKEY_BINARY_TEST_SOURCE,
                     self.binary_test_source)

        self._dut = self.registerController(android_device, False)[0]
        self._dut.stop()
        self._dut.adb.shell('mkdir %s -p' % config.FUZZER_TEST_DIR)

    def tearDownClass(self):
        """Deletes all copied data."""
        self._dut.adb.shell('rm -rf %s' % config.FUZZER_TEST_DIR)
        self._dut.start()

    def PushFiles(self, src):
        """adb pushes test case file to target."""
        push_src = os.path.join(self.data_file_path, src)
        push_dst = config.FUZZER_TEST_DIR
        self._dut.adb.push('%s %s' % (push_src, push_dst), no_except=True)
        logging.info('Adb pushed: %s \nto: %s', push_src, push_dst)
        return push_dst

    def CreateTestCases(self):
        """Creates LibFuzzerTestCase instances.

        Returns:
            LibFuzzerTestCase list.
        """
        test_cases = map(
            lambda x: LibFuzzerTestCase(x, config.FUZZER_DEFAULT_PARAMS, {}),
            self.binary_test_source)
        return test_cases

    # TODO: retrieve the corpus.
    def CreateCorpusDir(self, test_case):
        """Creates corpus directory on the target."""
        corpus_dir = test_case.GetCorpusName()
        self._dut.adb.shell('mkdir %s -p' % corpus_dir)

    def RunTestcase(self, test_case):
        """Runs the given test case and asserts the result.

        Args:
            test_case: LibFuzzerTestCase object
        """
        self.PushFiles(test_case.bin_host_path)
        self.CreateCorpusDir(test_case)
        fuzz_cmd = '"%s"' % test_case.GetRunCommand()
        result = self._dut.adb.shell(fuzz_cmd, no_except=True)

        # TODO: upload the corpus and, possibly, crash log.
        self.AssertTestResult(test_case, result)

    def LogCrashReport(self, test_case):
        """Logs crash-causing fuzzer input.

        Reads the crash report file and logs the contents in format:
        '\x01\x23\x45\x67\x89\xab\xcd\xef'

        Args:
            test_case: LibFuzzerTestCase object
        """
        touch_cmd = 'touch %s' % config.FUZZER_TEST_CRASH_REPORT
        self._dut.adb.shell(touch_cmd)

        # output is string of a hexdump from crash report file.
        # From the example above, output would be '0123456789abcdef'.
        xxd_cmd = 'xxd -p %s' % config.FUZZER_TEST_CRASH_REPORT
        output = self._dut.adb.shell(xxd_cmd)
        remove_chars = ['\r', '\t', '\n', ' ']
        for char in remove_chars:
            output = output.replace(char, '')

        crash_report = ''
        # output is guaranteed to be even in length since its a hexdump.
        for offset in xrange(0, len(output), 2):
            crash_report += '\\x%s' % output[offset:offset + 2]

        logging.info('FUZZER_TEST_CRASH_REPORT for %s: "%s"',
                     test_case.test_name, crash_report)

    # TODO(trong): differentiate between crashes and sanitizer rule violations.
    def AssertTestResult(self, test_case, result):
        """Asserts that test case finished as expected.

        Checks that device is in responsive state. If not, waits for boot
        then reports test as failure. If it is, asserts that all test commands
        returned exit code 0.

        Args:
            test_case: LibFuzzerTestCase object
            result: dict(str, str, int), command results from shell.
        """
        logging.info('Test case results.')
        logging.info('stdout: %s' % result[const.STDOUT])
        logging.info('stderr: %s' % result[const.STDERR])
        logging.info('exit code: %s' % result[const.EXIT_CODE])
        if not self._dut.hasBooted():
            self._dut.waitForBootCompletion()
            asserts.fail('%s left the device in unresponsive state.' %
                         test_case.test_name)

        exit_code = result[const.EXIT_CODE]
        if exit_code == config.ExitCode.FUZZER_TEST_FAIL:
            self.LogCrashReport(test_case)
            asserts.fail('%s failed normally.' % test_case.test_name)
        elif exit_code != config.ExitCode.FUZZER_TEST_PASS:
            asserts.fail('%s failed abnormally.' % test_case.test_name)

    def generateFuzzerTests(self):
        """Runs fuzzer tests."""
        self.runGeneratedTests(
            test_func=self.RunTestcase,
            settings=self.CreateTestCases(),
            name_func=lambda x: x.test_name)


if __name__ == '__main__':
    test_runner.main()