#!/usr/bin/env python # # Copyright 2018 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Unit tests for at_auth_unlock.""" import argparse import filecmp import os import shutil import subprocess import unittest from at_auth_unlock import * from Crypto.PublicKey import RSA from unittest.mock import patch def dataPath(file): return os.path.join(os.path.dirname(__file__), 'data', file) DATA_FILE_PIK_CERTIFICATE = dataPath('atx_pik_certificate.bin') DATA_FILE_PUK_CERTIFICATE = dataPath('atx_puk_certificate.bin') DATA_FILE_PUK_KEY = dataPath('testkey_atx_puk.pem') DATA_FILE_UNLOCK_CHALLENGE = dataPath('atx_unlock_challenge.bin') DATA_FILE_UNLOCK_CREDENTIAL = dataPath('atx_unlock_credential.bin') def createTempZip(contents): tempzip = tempfile.NamedTemporaryFile() with zipfile.ZipFile(tempzip, 'w') as zip: for arcname in contents: zip.write(contents[arcname], arcname) return tempzip def validUnlockCredsZip(): return createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) class UnlockCredentialsTest(unittest.TestCase): def testFromValidZipArchive(self): with validUnlockCredsZip() as zip: creds = UnlockCredentials.from_credential_archive(zip) self.assertIsNotNone(creds.intermediate_cert) self.assertIsNotNone(creds.unlock_cert) self.assertIsNotNone(creds.unlock_key) def testFromInvalidZipArchive(self): with self.assertRaises(zipfile.BadZipfile): UnlockCredentials.from_credential_archive(DATA_FILE_PUK_KEY) def testFromArchiveMissingPikCertificate(self): with createTempZip({ 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as zip: with self.assertRaises(ValueError): UnlockCredentials.from_credential_archive(zip) def testFromArchiveMissingPukCertificate(self): with createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as zip: with self.assertRaises(ValueError): UnlockCredentials.from_credential_archive(zip) def testFromArchiveMissingPuk(self): with createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, }) as zip: with self.assertRaises(ValueError): UnlockCredentials.from_credential_archive(zip) def testFromArchiveMultiplePikCertificates(self): with createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'pik_certificate_v2.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as zip: with self.assertRaises(ValueError): UnlockCredentials.from_credential_archive(zip) def testFromArchiveMultiplePukCertificates(self): with createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 'puk_certificate_v2.bin': DATA_FILE_PUK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as zip: with self.assertRaises(ValueError): UnlockCredentials.from_credential_archive(zip) def testFromArchiveMultiplePuks(self): with createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_certificate_v1.bin': DATA_FILE_PUK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY, 'puk_v2.pem': DATA_FILE_PUK_KEY }) as zip: with self.assertRaises(ValueError): UnlockCredentials.from_credential_archive(zip) def testFromFiles(self): creds = UnlockCredentials( intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, unlock_key_file=DATA_FILE_PUK_KEY) self.assertIsNotNone(creds.intermediate_cert) self.assertIsNotNone(creds.unlock_cert) self.assertIsNotNone(creds.unlock_key) def testInvalidPuk(self): with self.assertRaises(ValueError): UnlockCredentials( intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, unlock_key_file=DATA_FILE_PUK_CERTIFICATE) def testPukNotPrivateKey(self): tempdir = tempfile.mkdtemp() try: with open(DATA_FILE_PUK_KEY, 'rb') as f: key = RSA.importKey(f.read()) pubkey = os.path.join(tempdir, 'pubkey.pub') with open(pubkey, 'wb') as f: f.write(key.publickey().exportKey()) with self.assertRaises(ValueError): UnlockCredentials( intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, unlock_key_file=pubkey) finally: shutil.rmtree(tempdir) def testWrongSizeCerts(self): pik_cert = DATA_FILE_PIK_CERTIFICATE tempdir = tempfile.mkdtemp() try: # Copy a valid cert and truncate a single byte from the end to create a # too-short cert. shortfile = os.path.join(tempdir, 'shortfile.bin') shutil.copy2(pik_cert, shortfile) with open(shortfile, 'ab') as f: f.seek(-1, os.SEEK_END) f.truncate() with self.assertRaises(ValueError): creds = UnlockCredentials( intermediate_cert_file=shortfile, unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, unlock_key_file=DATA_FILE_PUK_KEY) with self.assertRaises(ValueError): creds = UnlockCredentials( intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, unlock_cert_file=shortfile, unlock_key_file=DATA_FILE_PUK_KEY) # Copy a valid cert and append an arbitrary byte on the end to create a # too-long cert. longfile = os.path.join(tempdir, 'longfile.bin') shutil.copy2(pik_cert, longfile) with open(longfile, 'ab') as f: f.write(b'\0') with self.assertRaises(ValueError): creds = UnlockCredentials( intermediate_cert_file=longfile, unlock_cert_file=DATA_FILE_PUK_CERTIFICATE, unlock_key_file=DATA_FILE_PUK_KEY) with self.assertRaises(ValueError): creds = UnlockCredentials( intermediate_cert_file=DATA_FILE_PIK_CERTIFICATE, unlock_cert_file=longfile, unlock_key_file=DATA_FILE_PUK_KEY) finally: shutil.rmtree(tempdir) def writeFullUnlockChallenge(out_file, product_id_hash=None): """Helper function to create a file with a full AvbAtxUnlockChallenge struct. Arguments: product_id_hash: [optional] 32 byte value to include in the challenge as the SHA256 hash of the product ID. If not provided, will default to the product ID hash from the subject of DATA_FILE_PUK_CERTIFICATE. """ if product_id_hash is None: with open(DATA_FILE_PUK_CERTIFICATE, 'rb') as f: product_id_hash = GetAtxCertificateSubject(f.read()) assert len(product_id_hash) == 32 with open(out_file, 'wb') as out: out.write(struct.pack('<I', 1)) out.write(product_id_hash) with open(DATA_FILE_UNLOCK_CHALLENGE, 'rb') as f: out.write(f.read()) class MakeAtxUnlockCredentialTest(unittest.TestCase): def testCredentialIsCorrect(self): with validUnlockCredsZip() as zip: creds = UnlockCredentials.from_credential_archive(zip) tempdir = tempfile.mkdtemp() try: challenge_file = os.path.join(tempdir, 'challenge') writeFullUnlockChallenge(challenge_file) challenge = UnlockChallenge(challenge_file) out_cred = os.path.join(tempdir, 'credential') # Compare unlock credential generated by function with one generated # using 'avbtool make_atx_unlock_credential', to check correctness. MakeAtxUnlockCredential(creds, challenge, out_cred) self.assertTrue(filecmp.cmp(out_cred, DATA_FILE_UNLOCK_CREDENTIAL)) finally: shutil.rmtree(tempdir) def testWrongChallengeSize(self): with validUnlockCredsZip() as zip: creds = UnlockCredentials.from_credential_archive(zip) tempdir = tempfile.mkdtemp() try: out_cred = os.path.join(tempdir, 'credential') # The bundled unlock challenge is just the 16 byte challenge, not the # full AvbAtxUnlockChallenge like this expects. with self.assertRaises(ValueError): challenge = UnlockChallenge(DATA_FILE_UNLOCK_CHALLENGE) MakeAtxUnlockCredential(creds, challenge, out_cred) finally: shutil.rmtree(tempdir) def makeFastbootCommandFake(testcase, expect_serial=None, error_on_command_number=None, product_id_hash=None, stay_locked=False): """Construct a fake fastboot command handler, to be used with unitttest.mock.Mock.side_effect. This can be used to create a callable that acts as a fake for a real device responding to the fastboot commands involved in an authenticated unlock. The returned callback is intended to be used with unittest.mock.Mock.side_effect. There are a number of optional arguments here that can be used to customize the behavior of the fake for a specific test. Arguments: testcase: unittest.TestCase object for the associated test expect_serial: [optional] Expect (and assert) that the fastboot command specifies a specific device serial to communicate with. error_on_command_number: [optional] Return a fastboot error (non-zero exit code) on the nth (0-based) command handled. stay_locked: [optional] Make the fake report that the device is still locked after an otherwise successful unlock attempt. """ def handler(args, *extraArgs, **kwargs): if error_on_command_number is not None: handler.command_counter += 1 if handler.command_counter - 1 == error_on_command_number: raise subprocess.CalledProcessError( returncode=1, cmd=args, output=b'Fake: ERROR') testcase.assertEqual(args.pop(0), 'fastboot') if expect_serial is not None: # This is a bit fragile in that, in reality, fastboot allows '-s SERIAL' # to not just be the first arguments, but it works for this use case. testcase.assertEqual(args.pop(0), '-s') testcase.assertEqual(args.pop(0), expect_serial) if args[0:2] == ['oem', 'at-get-vboot-unlock-challenge']: handler.challenge_staged = True elif args[0] == 'get_staged': if not handler.challenge_staged: raise subprocess.CalledProcessError( returncode=1, cmd=args, output=b'Fake: No data staged') writeFullUnlockChallenge(args[1], product_id_hash=product_id_hash) handler.challenge_staged = False elif args[0] == 'stage': handler.staged_file = args[1] elif args[0:2] == ['oem', 'at-unlock-vboot']: if handler.staged_file is None: raise subprocess.CalledProcessError( returncode=1, cmd=args, output=b'Fake: No unlock credential staged') # Validate the unlock credential as if this were a test key locked device, # which implies tests that want a successful unlock need to be set up to # use DATA_FILE_PUK_KEY to sign the challenge. Credentials generated using # other keys will be properly rejected. if not filecmp.cmp(handler.staged_file, DATA_FILE_UNLOCK_CREDENTIAL): raise subprocess.CalledProcessError( returncode=1, cmd=args, output=b'Fake: Incorrect unlock credential') handler.locked = True if stay_locked else False elif args[0:2] == ['getvar', 'at-vboot-state']: return b'avb-locked: ' + (b'1' if handler.locked else b'0') return b'Fake: OK' handler.command_counter = 0 handler.challenge_staged = False handler.staged_file = None handler.locked = True return handler class AuthenticatedUnlockTest(unittest.TestCase): @patch('subprocess.check_output') def testUnlockWithZipArchive(self, mock_subp_check_output): with validUnlockCredsZip() as zip: mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertEqual(main([zip.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testUnlockDeviceBySerial(self, mock_subp_check_output): with validUnlockCredsZip() as zip: SERIAL = 'abcde12345' mock_subp_check_output.side_effect = makeFastbootCommandFake( self, expect_serial=SERIAL) self.assertEqual(main([zip.name, '-s', SERIAL]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testUnlockWithIndividualFiles(self, mock_subp_check_output): mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertEqual( main([ '--pik_cert', DATA_FILE_PIK_CERTIFICATE, '--puk_cert', DATA_FILE_PUK_CERTIFICATE, '--puk', DATA_FILE_PUK_KEY ]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testFastbootError(self, mock_subp_check_output): """Verify that errors are handled properly if fastboot commands error out.""" with validUnlockCredsZip() as zip: for n in range(5): mock_subp_check_output.reset_mock() mock_subp_check_output.side_effect = makeFastbootCommandFake( self, error_on_command_number=n) self.assertNotEqual(main([zip.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testDoesntActuallyUnlock(self, mock_subp_check_output): """Verify fails if fake set to not actually unlock.""" with validUnlockCredsZip() as zip: mock_subp_check_output.side_effect = makeFastbootCommandFake( self, stay_locked=True) self.assertNotEqual(main([zip.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testNoCredentialsMatchDeviceProductID(self, mock_subp_check_output): """Test two cases where fake responds with a challenge that has a product ID hash which doesn't match the credentials used.""" # Case 1: Change the product ID hash that the fake responds with. with validUnlockCredsZip() as zip: mock_subp_check_output.side_effect = makeFastbootCommandFake( self, product_id_hash=b'\x00' * 32) self.assertNotEqual(main([zip.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) # Case 2: Use credentials with a different product ID. with createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) is # different 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as zip: mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertNotEqual(main([zip.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testMatchingCredentialSelectedFromZipArchives(self, mock_subp_check_output): """Test correct credential based on product ID hash used if multiple provided directly through arguments.""" with validUnlockCredsZip() as correctCreds, createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) # doesn't match 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as wrongCreds: mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertEqual(main([wrongCreds.name, correctCreds.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) @patch('subprocess.check_output') def testMatchingCredentialSelectedFromDirectory(self, mock_subp_check_output): """Test correct credential based on product ID hash used if multiple provided indirectly through a directory argument.""" with validUnlockCredsZip() as correctCreds, createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) # doesn't match 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as wrongCreds: tempdir = tempfile.mkdtemp() try: shutil.copy2(correctCreds.name, tempdir) shutil.copy2(wrongCreds.name, tempdir) mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertEqual(main([tempdir]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) finally: shutil.rmtree(tempdir) @patch('subprocess.check_output') def testMatchingCredentialSelectedFromEither(self, mock_subp_check_output): """Test correct credential based on product ID hash used if arguments give some combination of file and directory arguments.""" with validUnlockCredsZip() as correctCreds, createTempZip({ 'pik_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, # Note: PIK cert used as PUK cert so subject (i.e. product ID hash) # doesn't match 'puk_certificate_v1.bin': DATA_FILE_PIK_CERTIFICATE, 'puk_v1.pem': DATA_FILE_PUK_KEY }) as wrongCreds: # Case 1: Correct creds in directory, wrong in file arg tempdir = tempfile.mkdtemp() try: shutil.copy2(correctCreds.name, tempdir) mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertEqual(main([wrongCreds.name, tempdir]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) finally: shutil.rmtree(tempdir) # Case 2: Correct creds in file arg, wrong in directory tempdir = tempfile.mkdtemp() try: shutil.copy2(wrongCreds.name, tempdir) mock_subp_check_output.side_effect = makeFastbootCommandFake(self) self.assertEqual(main([tempdir, correctCreds.name]), 0) self.assertNotEqual(mock_subp_check_output.call_count, 0) # Case 2: Correct creds in file arg, wrong in directory finally: shutil.rmtree(tempdir) @patch('argparse.ArgumentParser.error') def testArgparseDirectoryWithNoCredentials(self, mock_parser_error): """Test """ tempdir = tempfile.mkdtemp() try: # Make sure random files are ignored. with open(os.path.join(tempdir, 'so_random'), 'w') as f: f.write("I'm a random file") mock_parser_error.side_effect = ValueError('ArgumentParser.error') with self.assertRaises(ValueError): main([tempdir]) self.assertEqual(mock_parser_error.call_count, 1) finally: shutil.rmtree(tempdir) @patch('argparse.ArgumentParser.error') def testArgparseMutualExclusionArchiveAndFiles(self, mock_parser_error): mock_parser_error.side_effect = ValueError('ArgumentParser.error') with self.assertRaises(ValueError): main(['dummy.zip', '--pik_cert', DATA_FILE_PIK_CERTIFICATE]) self.assertEqual(mock_parser_error.call_count, 1) @patch('argparse.ArgumentParser.error') def testArgparseMutualInclusionOfFileArgs(self, mock_parser_error): mock_parser_error.side_effect = ValueError('ArgumentParser.error') with self.assertRaises(ValueError): main(['--pik_cert', 'pik_cert.bin', '--puk_cert', 'puk_cert.bin']) self.assertEqual(mock_parser_error.call_count, 1) mock_parser_error.reset_mock() with self.assertRaises(ValueError): main(['--pik_cert', 'pik_cert.bin', '--puk', 'puk.pem']) self.assertEqual(mock_parser_error.call_count, 1) mock_parser_error.reset_mock() with self.assertRaises(ValueError): main(['--puk_cert', 'puk_cert.bin', '--puk', 'puk.pem']) self.assertEqual(mock_parser_error.call_count, 1) @patch('argparse.ArgumentParser.error') def testArgparseMissingBundleAndFiles(self, mock_parser_error): mock_parser_error.side_effect = ValueError('ArgumentParser.error') with self.assertRaises(ValueError): main(['-s', '1234abcd']) self.assertEqual(mock_parser_error.call_count, 1) if __name__ == '__main__': unittest.main(verbosity=3)