#!/usr/bin/env python
#
# Copyright (C) 2018 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""VTS tests to verify boot/recovery image header versions."""
import logging
import os
import shutil
from struct import unpack
import tempfile
import zlib
from vts.runners.host import asserts
from vts.runners.host import base_test
from vts.runners.host import test_runner
from vts.utils.python.android import api
from vts.utils.python.file import target_file_utils
block_dev_path = "/dev/block/platform" # path to platform block devices
PROPERTY_SLOT_SUFFIX = "ro.boot.slot_suffix" # indicates current slot suffix for A/B devices
BOOT_HEADER_DTBO_SIZE_OFFSET = 1632 # offset of recovery dtbo size in boot header of version 1.
class VtsFirmwareBootHeaderVerificationTest(base_test.BaseTestClass):
"""Verifies boot/recovery image header.
Attributes:
temp_dir: The temporary directory on host.
slot_suffix: The current slot suffix for A/B devices.
"""
def setUpClass(self):
"""Initializes the DUT and creates temporary directories."""
self.dut = self.android_devices[0]
self.shell = self.dut.shell
self.adb = self.dut.adb
self.temp_dir = tempfile.mkdtemp()
self.launch_api_level = self.dut.getLaunchApiLevel()
logging.info("Create %s", self.temp_dir)
self.slot_suffix = self.dut.getProp(PROPERTY_SLOT_SUFFIX)
if self.slot_suffix is None:
self.slot_suffix = ""
logging.info("current slot suffix: %s", self.slot_suffix)
def setUp(self):
"""Checks if the the preconditions to run the test are met."""
if "x86" in self.dut.cpu_abi:
global block_dev_path
block_dev_path = "/dev/block"
acpio_idx_string = self.adb.shell(
"cat /proc/cmdline | "
"grep -o \"androidboot.acpio_idx=[^ ]*\" |"
"cut -d \"=\" -f 2 ").replace('\n','')
asserts.skipIf((len(acpio_idx_string) == 0), "Skipping test for x86 NON-ACPI ABI")
def get_number_of_pages(self, image_size, page_size):
"""Calculates the number of pages required for the image.
Args:
image_size: size of the image.
page_size : size of page.
Returns:
Number of pages required for the image
"""
return (image_size + page_size - 1) / page_size
def checkValidRamdisk(self, ramdisk_image):
"""Verifies that the ramdisk extracted from boot.img is a valid gzipped cpio archive.
Args:
ramdisk_image: ramdisk extracted from boot.img.
"""
# Set wbits parameter to zlib.MAX_WBITS|16 to expect a gzip header and
# trailer.
unzipped_ramdisk = zlib.decompress(ramdisk_image, zlib.MAX_WBITS|16)
# The CPIO header magic can be "070701" or "070702" as per kernel
# documentation: Documentation/early-userspace/buffer-format.txt
cpio_header_magic = unzipped_ramdisk[0:6]
asserts.assertTrue(cpio_header_magic == "070701" or cpio_header_magic == "070702",
"cpio archive header magic not found in ramdisk")
def CheckImageHeader(self, boot_image, is_recovery=False):
"""Verifies the boot image format.
Args:
boot_image: Path to the boot image.
is_recovery: Indicates that the image is recovery if true.
"""
try:
with open(boot_image, "rb") as image_file:
image_file.read(8) # read boot magic
(kernel_size, _, ramdisk_size, _, _, _, _, page_size,
host_image_header_version) = unpack("9I", image_file.read(9 * 4))
asserts.assertNotEqual(kernel_size, 0, "boot.img/recovery.img must contain kernel")
if self.launch_api_level > api.PLATFORM_API_LEVEL_P:
asserts.assertTrue(
host_image_header_version >= 2,
"Device must atleast have a boot image of version 2")
asserts.assertNotEqual(ramdisk_size, 0, "boot.img must contain ramdisk")
# ramdisk comes after the header and kernel pages
num_kernel_pages = self.get_number_of_pages(kernel_size, page_size)
ramdisk_offset = page_size * (1 + num_kernel_pages)
image_file.seek(ramdisk_offset)
ramdisk_buf = image_file.read(ramdisk_size)
self.checkValidRamdisk(ramdisk_buf)
else:
asserts.assertTrue(
host_image_header_version >= 1,
"Device must atleast have a boot image of version 1")
image_file.seek(BOOT_HEADER_DTBO_SIZE_OFFSET)
recovery_dtbo_size = unpack("I", image_file.read(4))[0]
image_file.read(8) # ignore recovery dtbo load address
if is_recovery:
asserts.assertNotEqual(
recovery_dtbo_size, 0,
"recovery partition for non-A/B devices must contain the recovery DTBO"
)
boot_header_size = unpack("I", image_file.read(4))[0]
if host_image_header_version > 1:
dtb_size = unpack("I", image_file.read(4))[0]
asserts.assertNotEqual(dtb_size, 0, "Boot/recovery image must contain DTB")
image_file.read(8) # ignore DTB physical load address
expected_header_size = image_file.tell()
asserts.assertEqual(
boot_header_size, expected_header_size,
"Test failure due to boot header size mismatch. Expected %s Actual %s"
% (expected_header_size, boot_header_size))
except IOError as e:
logging.exception(e)
asserts.fail("Unable to open boot image file")
def testBootImageHeader(self):
"""Validates boot image header."""
current_boot_partition = "boot" + str(self.slot_suffix)
boot_path = target_file_utils.FindFiles(
self.shell, block_dev_path, current_boot_partition, "-type l")
logging.info("Boot path %s", boot_path)
if not boot_path:
asserts.fail("Unable to find path to boot image on device.")
host_boot_path = os.path.join(self.temp_dir, "boot.img")
self.adb.pull("%s %s" % (boot_path[0], host_boot_path))
self.CheckImageHeader(host_boot_path)
def testRecoveryImageHeader(self):
"""Validates recovery image header."""
asserts.skipIf(self.slot_suffix,
"A/B devices do not have a separate recovery partition")
recovery_path = target_file_utils.FindFiles(self.shell, block_dev_path,
"recovery", "-type l")
logging.info("recovery path %s", recovery_path)
if not recovery_path:
asserts.fail("Unable to find path to recovery image on device.")
host_recovery_path = os.path.join(self.temp_dir, "recovery.img")
self.adb.pull("%s %s" % (recovery_path[0], host_recovery_path))
self.CheckImageHeader(host_recovery_path, True)
def tearDownClass(self):
"""Deletes temporary directories."""
shutil.rmtree(self.temp_dir)
if __name__ == "__main__":
test_runner.main()