#!/usr/bin/env python # # Copyright 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Helper tool for performing an authenticated AVB unlock of an Android Things device. This tool communicates with an Android Things device over fastboot to perform an authenticated AVB unlock. The user provides unlock credentials valid for the device they want to unlock, likely obtained from the Android Things Developer Console. The tool handles the sequence of fastboot commands to complete the challenge-response unlock protocol. Unlock credentials can be provided to the tool in one of two ways: 1) by providing paths to the individual credential files using the '--pik_cert', '--puk_cert', and '--puk' command line swtiches, or 2) by providing a path to a zip archive containing the three credential files, named as follows: - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin' - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin' - PUK private key: 'puk.*\.pem' You can also provide one or more archives and/or one or more directories containing such zip archives. In either scenario, the tool will search all of the provided credential archives for a match against the product ID of the device being unlocked and automatically use the first match. Dependencies: - Python 2.7.x, 3.2.x, or newer (for argparse) - PyCrypto 2.5 or newer (for PKCS1_v1_5 and RSA PKCS#8 PEM key import) - Android SDK Platform Tools (for fastboot), in PATH - https://developer.android.com/studio/releases/platform-tools """ HELP_DESCRIPTION = """Performs an authenticated AVB unlock of an Android Things device over fastboot, given valid unlock credentials for the device.""" HELP_USAGE = """ %(prog)s [-h] [-v] [-s SERIAL] unlock_creds.zip [unlock_creds_2.zip ...] %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem""" HELP_EPILOG = """examples: %(prog)s unlock_creds.zip %(prog)s unlock_creds.zip unlock_creds_2.zip -s SERIAL %(prog)s path_to_dir_with_multiple_unlock_creds/ %(prog)s --pik_cert pik_cert.bin --puk_cert puk_cert.bin --puk puk.pem""" import sys ver = sys.version_info if (ver[0] < 2) or (ver[0] == 2 and ver[1] < 7) or (ver[0] == 3 and ver[1] < 2): print('This script requires Python 2.7+ or 3.2+') sys.exit(1) import argparse import binascii import os import re import shutil import struct import subprocess import tempfile import zipfile # Requires PyCrypto 2.5 (or newer) for PKCS1_v1_5 and support for importing # PEM-encoded RSA keys try: from Crypto.Hash import SHA512 from Crypto.PublicKey import RSA from Crypto.Signature import PKCS1_v1_5 except ImportError as e: print('PyCrypto 2.5 or newer required, missing or too old: ' + str(e)) class UnlockCredentials(object): """Helper data container class for the 3 unlock credentials involved in an AVB authenticated unlock operation. """ def __init__(self, intermediate_cert_file, unlock_cert_file, unlock_key_file, source_file=None): # The certificates are AvbAtxCertificate structs as defined in libavb_atx, # not an X.509 certificate. Do a basic length sanity check when reading # them. EXPECTED_CERTIFICATE_SIZE = 1620 with open(intermediate_cert_file, 'rb') as f: self._intermediate_cert = f.read() if len(self._intermediate_cert) != EXPECTED_CERTIFICATE_SIZE: raise ValueError('Invalid intermediate key certificate length.') with open(unlock_cert_file, 'rb') as f: self._unlock_cert = f.read() if len(self._unlock_cert) != EXPECTED_CERTIFICATE_SIZE: raise ValueError('Invalid product unlock key certificate length.') with open(unlock_key_file, 'rb') as f: self._unlock_key = RSA.importKey(f.read()) if not self._unlock_key.has_private(): raise ValueError('Unlock key was not an RSA private key.') self._source_file = source_file @property def intermediate_cert(self): return self._intermediate_cert @property def unlock_cert(self): return self._unlock_cert @property def unlock_key(self): return self._unlock_key @property def source_file(self): return self._source_file @classmethod def from_credential_archive(cls, archive): """Create UnlockCredentials from an unlock credential zip archive. The zip archive must contain the following three credential files, named as follows: - Product Intermediate Key (PIK) certificate: 'pik_certificate.*\.bin' - Product Unlock Key (PUK) certificate: 'puk_certificate.*\.bin' - PUK private key: 'puk.*\.pem' This uses @contextlib.contextmanager so we can clean up the tempdir created to unpack the zip contents into. Arguments: - archive: Filename of zip archive containing unlock credentials. Raises: ValueError: If archive is either missing a required file or contains multiple files matching one of the filename formats. """ def _find_one_match(contents, regex, desc): r = re.compile(regex) matches = list(filter(r.search, contents)) if not matches: raise ValueError( "Couldn't find {} file (matching regex '{}') in archive {}".format( desc, regex, archive)) elif len(matches) > 1: raise ValueError( "Found multiple files for {} (matching regex '{}') in archive {}" .format(desc, regex, archive)) return matches[0] tempdir = tempfile.mkdtemp() try: with zipfile.ZipFile(archive, mode='r') as zip: contents = zip.namelist() pik_cert_re = r'^pik_certificate.*\.bin$' pik_cert = _find_one_match(contents, pik_cert_re, 'intermediate key (PIK) certificate') puk_cert_re = r'^puk_certificate.*\.bin$' puk_cert = _find_one_match(contents, puk_cert_re, 'unlock key (PUK) certificate') puk_re = r'^puk.*\.pem$' puk = _find_one_match(contents, puk_re, 'unlock key (PUK)') zip.extractall(path=tempdir, members=[pik_cert, puk_cert, puk]) return cls( intermediate_cert_file=os.path.join(tempdir, pik_cert), unlock_cert_file=os.path.join(tempdir, puk_cert), unlock_key_file=os.path.join(tempdir, puk), source_file=archive) finally: shutil.rmtree(tempdir) class UnlockChallenge(object): """Helper class for parsing the AvbAtxUnlockChallenge struct returned from 'fastboot oem at-get-vboot-unlock-challenge'. The file provided to the constructor should be the full 52-byte AvbAtxUnlockChallenge struct, not just the challenge itself. """ def __init__(self, challenge_file): CHALLENGE_STRUCT_SIZE = 52 PRODUCT_ID_HASH_SIZE = 32 CHALLENGE_DATA_SIZE = 16 with open(challenge_file, 'rb') as f: data = f.read() if len(data) != CHALLENGE_STRUCT_SIZE: raise ValueError('Invalid unlock challenge length.') self._version, self._product_id_hash, self._challenge_data = struct.unpack( '<I{}s{}s'.format(PRODUCT_ID_HASH_SIZE, CHALLENGE_DATA_SIZE), data) @property def version(self): return self._version @property def product_id_hash(self): return self._product_id_hash @property def challenge_data(self): return self._challenge_data def GetAtxCertificateSubject(cert): """Parses and returns the subject field from the given AvbAtxCertificate struct.""" CERT_SUBJECT_OFFSET = 4 + 1032 # Format version and public key come before subject CERT_SUBJECT_LENGTH = 32 return cert[CERT_SUBJECT_OFFSET:CERT_SUBJECT_OFFSET + CERT_SUBJECT_LENGTH] def SelectMatchingUnlockCredential(all_creds, challenge): """Find and return the first UnlockCredentials object whose product ID matches that of the unlock challenge. The Product Unlock Key (PUK) certificate's subject field contains the SHA256 hash of the product ID that it can be used to unlock. This same value (SHA256 hash of the product ID) is contained in the unlock challenge. Arguments: all_creds: List of UnlockCredentials objects to be searched for a match against the given challenge. challenge: UnlockChallenge object created from challenge obtained via 'fastboot oem at-get-vboot-unlock-challenge'. """ for creds in all_creds: if GetAtxCertificateSubject(creds.unlock_cert) == challenge.product_id_hash: return creds def MakeAtxUnlockCredential(creds, challenge, out_file): """Simple reimplementation of 'avbtool make_atx_unlock_credential'. Generates an Android Things authenticated unlock credential to authorize unlocking AVB on a device. This is reimplemented locally for simplicity, which avoids the need to bundle this tool with the full avbtool. avbtool also uses openssl by default whereas this uses PyCrypto, which makes it easier to support Windows since there are no officially supported openssl binary distributions. Arguments: creds: UnlockCredentials object wrapping the PIK certificate, PUK certificate, and PUK private key. challenge: UnlockChallenge object created from challenge obtained via 'fastboot oem at-get-vboot-unlock-challenge'. out_file: Output filename to write the AvbAtxUnlockCredential struct to. Raises: ValueError: If challenge has wrong length. """ hash = SHA512.new(challenge.challenge_data) signer = PKCS1_v1_5.new(creds.unlock_key) signature = signer.sign(hash) with open(out_file, 'wb') as out: out.write(struct.pack('<I', 1)) # Format Version out.write(creds.intermediate_cert) out.write(creds.unlock_cert) out.write(signature) def AuthenticatedUnlock(all_creds, serial=None, verbose=False): """Performs an authenticated AVB unlock of a device over fastboot. Arguments: all_creds: List of UnlockCredentials objects wrapping the PIK certificate, PUK certificate, and PUK private key. The list will be searched to find matching credentials for the device being unlocked. serial: [optional] A device serial number or other valid value to be passed to fastboot's '-s' switch to select the device to unlock. verbose: [optional] Enable verbose output, which prints the fastboot commands and their output as the commands are run. """ tempdir = tempfile.mkdtemp() try: challenge_file = os.path.join(tempdir, 'challenge') credential_file = os.path.join(tempdir, 'credential') def fastboot_cmd(args): args = ['fastboot'] + (['-s', serial] if serial else []) + args if verbose: print('\n$ ' + ' '.join(args)) out = subprocess.check_output( args, stderr=subprocess.STDOUT).decode('utf-8') if verbose: print(out) return out try: fastboot_cmd(['oem', 'at-get-vboot-unlock-challenge']) fastboot_cmd(['get_staged', challenge_file]) challenge = UnlockChallenge(challenge_file) print('Product ID SHA256 hash = {}'.format( binascii.hexlify(challenge.product_id_hash))) selected_cred = SelectMatchingUnlockCredential(all_creds, challenge) if not selected_cred: print( 'ERROR: None of the provided unlock credentials match this device.') return False if selected_cred.source_file: print('Found matching unlock credentials: {}'.format( selected_cred.source_file)) MakeAtxUnlockCredential(selected_cred, challenge, credential_file) fastboot_cmd(['stage', credential_file]) fastboot_cmd(['oem', 'at-unlock-vboot']) res = fastboot_cmd(['getvar', 'at-vboot-state']) if re.search(r'avb-locked(:\s*|=)0', res) is not None: print('Device successfully AVB unlocked') return True else: print('ERROR: Commands succeeded but device still locked') return False except subprocess.CalledProcessError as e: print(e.output.decode('utf-8')) print("Command '{}' returned non-zero exit status {}".format( ' '.join(e.cmd), e.returncode)) return False finally: shutil.rmtree(tempdir) def FindUnlockCredentialsInDirectory(dir, verbose=False): if not os.path.isdir(dir): raise ValueError('Not a directory: ' + dir) creds = [] for file in os.listdir(dir): path = os.path.join(dir, file) if os.path.isfile(path): try: creds.append(UnlockCredentials.from_credential_archive(path)) if verbose: print('Found valid unlock credential bundle: ' + path) except (IOError, ValueError, zipfile.BadZipfile) as e: if verbose: print( "Ignoring file which isn't a valid unlock credential zip bundle: " + path) return creds def main(in_args): parser = argparse.ArgumentParser( description=HELP_DESCRIPTION, usage=HELP_USAGE, epilog=HELP_EPILOG, formatter_class=argparse.RawDescriptionHelpFormatter) # General optional arguments. parser.add_argument( '-v', '--verbose', action='store_true', help= 'enable verbose output, e.g. prints fastboot commands and their output') parser.add_argument( '-s', '--serial', help= "specify device to unlock, either by serial or any other valid value for fastboot's -s arg" ) # User must provide either a unlock credential bundle, or the individual files # normally contained in such a bundle. # argparse doesn't support specifying this argument format - two groups of # mutually exclusive arguments, where one group requires all arguments in that # group to be specified - so we define them as optional arguments and do the # validation ourselves below. # Argument group #1 - Unlock credential zip archive(s) (or directory # containing multiple such archives) parser.add_argument( 'bundle', metavar='unlock_creds.zip', nargs='*', help= 'Unlock using a zip bundle/archive of credentials (e.g. from Developer ' 'Console). You can optionally provide multiple archives and/or a ' 'directory of such bundles and the tool will automatically select the ' 'correct one to use based on matching the product ID against the device ' 'being unlocked.') # Argument group #2 - Individual credential files parser.add_argument( '--pik_cert', metavar='pik_cert.bin', help='Path to product intermediate key (PIK) certificate file') parser.add_argument( '--puk_cert', metavar='puk_cert.bin', help='Path to product unlock key (PUK) certificate file') parser.add_argument( '--puk', metavar='puk.pem', help='Path to product unlock key in PEM format') # Print help if no args given args = parser.parse_args(in_args if in_args else ['-h']) # Do the custom validation described above. if args.pik_cert is not None or args.puk_cert is not None or args.puk is not None: # Check mutual exclusion with bundle positional argument if len(args.bundle): parser.error( 'bundle argument is mutually exclusive with --pik_cert, --puk_cert, and --puk' ) # Check for 'mutual inclusion' of individual file options if args.pik_cert is None: parser.error("--pik_cert is required if --puk_cert or --puk' is given") if args.puk_cert is None: parser.error("--puk_cert is required if --pik_cert or --puk' is given") if args.puk is None: parser.error("--puk is required if --pik_cert or --puk_cert' is given") elif not len(args.bundle): parser.error( 'must provide either credentials bundle or individual credential files') # Parse arguments into UnlockCredentials objects if len(args.bundle): creds = [] for path in args.bundle: if os.path.isfile(path): creds.append(UnlockCredentials.from_credential_archive(path)) elif os.path.isdir(path): creds.extend( FindUnlockCredentialsInDirectory(path, verbose=args.verbose)) else: parser.error("path argument '{}' does not exist".format(path)) if len(creds) == 0: parser.error('No unlock credentials were found in any of the given paths') else: creds = [UnlockCredentials(args.pik_cert, args.puk_cert, args.puk)] ret = AuthenticatedUnlock(creds, serial=args.serial, verbose=args.verbose) return 0 if ret else 1 if __name__ == '__main__': sys.exit(main(sys.argv[1:]))