普通文本  |  182行  |  7.8 KB

#!/usr/bin/env python
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""VTS tests to verify boot/recovery image header versions."""

import logging
import os
import shutil
from struct import unpack
import tempfile
import zlib

from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import test_runner
from vts.utils.python.android import api
from vts.utils.python.file import target_file_utils

block_dev_path = "/dev/block/platform"  # path to platform block devices
PROPERTY_SLOT_SUFFIX = "ro.boot.slot_suffix"  # indicates current slot suffix for A/B devices
BOOT_HEADER_DTBO_SIZE_OFFSET = 1632  # offset of recovery dtbo size in boot header of version 1.


class VtsFirmwareBootHeaderVerificationTest(base_test.BaseTestClass):
    """Verifies boot/recovery image header.

    Attributes:
        temp_dir: The temporary directory on host.
        slot_suffix: The current slot suffix for A/B devices.
    """

    def setUpClass(self):
        """Initializes the DUT and creates temporary directories."""
        self.dut = self.android_devices[0]
        self.shell = self.dut.shell
        self.adb = self.dut.adb
        self.temp_dir = tempfile.mkdtemp()
        self.launch_api_level = self.dut.getLaunchApiLevel()
        logging.info("Create %s", self.temp_dir)
        self.slot_suffix = self.dut.getProp(PROPERTY_SLOT_SUFFIX)
        if self.slot_suffix is None:
            self.slot_suffix = ""
        logging.info("current slot suffix: %s", self.slot_suffix)

    def setUp(self):
        """Checks if the the preconditions to run the test are met."""
        if "x86" in self.dut.cpu_abi:
            global block_dev_path
            block_dev_path = "/dev/block"
            acpio_idx_string = self.adb.shell(
                "cat /proc/cmdline | "
                "grep -o \"androidboot.acpio_idx=[^ ]*\" |"
                "cut -d \"=\" -f 2 ").replace('\n','')
            asserts.skipIf((len(acpio_idx_string) == 0), "Skipping test for x86 NON-ACPI ABI")

    def get_number_of_pages(self, image_size, page_size):
        """Calculates the number of pages required for the image.

        Args:
            image_size: size of the image.
            page_size : size of page.

        Returns:
            Number of pages required for the image
        """
        return (image_size + page_size - 1) / page_size

    def checkValidRamdisk(self, ramdisk_image):
        """Verifies that the ramdisk extracted from boot.img is a valid gzipped cpio archive.

        Args:
            ramdisk_image: ramdisk extracted from boot.img.
        """
        # Set wbits parameter to zlib.MAX_WBITS|16 to expect a gzip header and
        # trailer.
        unzipped_ramdisk = zlib.decompress(ramdisk_image, zlib.MAX_WBITS|16)
        # The CPIO header magic can be "070701" or "070702" as per kernel
        # documentation: Documentation/early-userspace/buffer-format.txt
        cpio_header_magic = unzipped_ramdisk[0:6]
        asserts.assertTrue(cpio_header_magic == "070701" or cpio_header_magic == "070702",
                           "cpio archive header magic not found in ramdisk")

    def CheckImageHeader(self, boot_image, is_recovery=False):
        """Verifies the boot image format.

        Args:
            boot_image: Path to the boot image.
            is_recovery: Indicates that the image is recovery if true.
        """
        try:
            with open(boot_image, "rb") as image_file:
                image_file.read(8)  # read boot magic
                (kernel_size, _, ramdisk_size, _, _, _, _, page_size,
                 host_image_header_version) = unpack("9I", image_file.read(9 * 4))

                asserts.assertNotEqual(kernel_size, 0, "boot.img/recovery.img must contain kernel")

                if self.launch_api_level > api.PLATFORM_API_LEVEL_P:
                    asserts.assertTrue(
                        host_image_header_version >= 2,
                        "Device must atleast have a boot image of version 2")

                    asserts.assertNotEqual(ramdisk_size, 0, "boot.img must contain ramdisk")

                    # ramdisk comes after the header and kernel pages
                    num_kernel_pages = self.get_number_of_pages(kernel_size, page_size)
                    ramdisk_offset = page_size * (1 + num_kernel_pages)
                    image_file.seek(ramdisk_offset)
                    ramdisk_buf = image_file.read(ramdisk_size)
                    self.checkValidRamdisk(ramdisk_buf)
                else:
                    asserts.assertTrue(
                        host_image_header_version >= 1,
                        "Device must atleast have a boot image of version 1")
                image_file.seek(BOOT_HEADER_DTBO_SIZE_OFFSET)
                recovery_dtbo_size = unpack("I", image_file.read(4))[0]
                image_file.read(8)  # ignore recovery dtbo load address
                if is_recovery:
                    asserts.assertNotEqual(
                        recovery_dtbo_size, 0,
                        "recovery partition for non-A/B devices must contain the recovery DTBO"
                    )
                boot_header_size = unpack("I", image_file.read(4))[0]
                if host_image_header_version > 1:
                    dtb_size = unpack("I", image_file.read(4))[0]
                    asserts.assertNotEqual(dtb_size, 0, "Boot/recovery image must contain DTB")
                    image_file.read(8)  # ignore DTB physical load address
                expected_header_size = image_file.tell()
                asserts.assertEqual(
                    boot_header_size, expected_header_size,
                    "Test failure due to boot header size mismatch. Expected %s Actual %s"
                    % (expected_header_size, boot_header_size))
        except IOError as e:
            logging.exception(e)
            asserts.fail("Unable to open boot image file")

    def testBootImageHeader(self):
        """Validates boot image header."""
        current_boot_partition = "boot" + str(self.slot_suffix)
        boot_path = target_file_utils.FindFiles(
            self.shell, block_dev_path, current_boot_partition, "-type l")
        logging.info("Boot path %s", boot_path)
        if not boot_path:
            asserts.fail("Unable to find path to boot image on device.")
        host_boot_path = os.path.join(self.temp_dir, "boot.img")
        self.adb.pull("%s %s" % (boot_path[0], host_boot_path))
        self.CheckImageHeader(host_boot_path)

    def testRecoveryImageHeader(self):
        """Validates recovery image header."""
        asserts.skipIf(self.slot_suffix,
                       "A/B devices do not have a separate recovery partition")
        recovery_path = target_file_utils.FindFiles(self.shell, block_dev_path,
                                                    "recovery", "-type l")
        logging.info("recovery path %s", recovery_path)
        if not recovery_path:
            asserts.fail("Unable to find path to recovery image on device.")
        host_recovery_path = os.path.join(self.temp_dir, "recovery.img")
        self.adb.pull("%s %s" % (recovery_path[0], host_recovery_path))
        self.CheckImageHeader(host_recovery_path, True)

    def tearDownClass(self):
        """Deletes temporary directories."""
        shutil.rmtree(self.temp_dir)


if __name__ == "__main__":
    test_runner.main()