#!/usr/bin/python
#
# Copyright 2017 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Test that implements the Android Things Attestation Provisioning protocol.
Enables testing of the device side of the Android Things Attestation
Provisioning (ATAP) Protocol without access to a CA or Android Things Factory
Appliance (ATFA).
"""
import argparse
from collections import namedtuple
import os
import struct
from aesgcm import AESGCM
import cryptography.exceptions
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
import curve25519
import ec_helper
_ATAPSessionParameters = namedtuple('_AtapSessionParameters', [
'algorithm', 'operation', 'private_key', 'public_key'
])
_MESSAGE_VERSION = 1
_OPERATIONS = {'ISSUE': 2, 'ISSUE_ENC': 3}
_ALGORITHMS = {'p256': 1, 'x25519': 2}
_ECDH_KEY_LEN = 33
_session_params = _ATAPSessionParameters(0, 0, bytes(), bytes())
def _write_operation_start(algorithm, operation):
"""Writes a fresh Operation Start message to tmp/operation_start.bin.
Generates an ECDHE key specified by <algorithm> and writes an Operation
Start message for executing <operation> on the device.
Args:
algorithm: Integer specifying the curve to use for the session key.
1: P256, 2: X25519
operation: Specifies the operation. 1: Certify, 2: Issue, 3: Issue Encrypted
Raises:
ValueError: algorithm or operation is is invalid.
"""
global _session_params
if algorithm > 2 or algorithm < 1:
raise ValueError('Invalid algorithm value.')
if operation > 3 or operation < 1:
raise ValueError('Invalid operation value.')
# Generate new key for each provisioning session
if algorithm == _ALGORITHMS['x25519']:
private_key = curve25519.genkey()
# Make 33 bytes to match P256
public_key = curve25519.public(private_key) + '\0'
elif algorithm == _ALGORITHMS['p256']:
[private_key, public_key] = ec_helper.generate_p256_key()
_session_params = _ATAPSessionParameters(algorithm, operation, private_key,
public_key)
# "Operation Start" Header
# +2 for algo and operation bytes
header = (_MESSAGE_VERSION, 0, 0, 0, _ECDH_KEY_LEN + 2)
operation_start = bytearray(struct.pack('<4B I', *header))
# "Operation Start" Message
op_start = (algorithm, operation, public_key)
operation_start.extend(struct.pack('<2B 33s', *op_start))
with open('tmp/operation_start.bin', 'wb') as f:
f.write(operation_start)
def _get_ca_response(ca_request):
"""Writes a CA Response message to tmp/ca_response.bin.
Parses the CA Request message at ca_request. Computes the session key from
the ca_request, decrypts the inner request, verifies the SOM key signature,
and issues or certifies attestation keys as applicable. The CA Response
message containing test keys is written to ca_response.bin.
Args:
ca_request: The CA Request message from the device.
Raises:
ValueError: ca_request is malformed.
CA Request message format for reference, sizes in bytes
cleartext header 8
cleartext device ephemeral public key 33
cleartext GCM IV 12
encrypted header 8
encrypted SOM key certificate chain variable
encrypted SOM key authentication signature variable
encrypted product ID SHA256 hash 32
encrypted RSA public key variable
encrypted ECDSA public key variable
encrypted edDSA public key variable
cleartext GCM tag 16
"""
var_len = 4
header_len = 8
pub_key_len = _ECDH_KEY_LEN
gcm_iv_len = 12
prod_id_hash_len = 32
gcm_tag_len = 16
min_message_length = (
header_len + pub_key_len + gcm_iv_len + header_len + var_len + var_len +
prod_id_hash_len + var_len + var_len + var_len + gcm_tag_len)
if len(ca_request) < min_message_length:
raise ValueError('Malformed message: Length invalid')
# Unpack Request header
end = header_len
ca_req_start = ca_request[:end]
(device_message_version, res1, res2, res3,
device_message_len) = struct.unpack('<4B I', ca_req_start)
if device_message_version != _MESSAGE_VERSION:
raise ValueError('Malformed message: Incorrect message version')
if res1 or res2 or res3:
raise ValueError('Malformed message: Reserved values set')
if device_message_len > len(ca_request) - header_len:
raise ValueError('Malformed message: Incorrect device message length')
# Extract AT device ephemeral public key
start = header_len
end = start + pub_key_len
device_pub_key = bytes(ca_request[start:end])
# Generate shared_key
salt = _session_params.public_key + device_pub_key
shared_key = _get_shared_key(_session_params.algorithm, device_pub_key, salt)
# Decrypt AES-128-GCM message using the shared_key
# Extract the GCM IV
start = header_len + pub_key_len
end = start + gcm_iv_len
gcm_iv = bytes(ca_request[start:end])
# Extract the encrypted message
start = header_len + pub_key_len + gcm_iv_len
enc_message_len = _get_var_len(ca_request, start)
if enc_message_len > len(ca_request) - gcm_tag_len - start - var_len:
raise ValueError('Encrypted message size %d too large' % enc_message_len)
start += var_len
end = start + enc_message_len
enc_message = bytes(ca_request[start:end])
# Extract the GCM Tag
gcm_tag = bytes(ca_request[-gcm_tag_len:])
# Decrypt message
try:
data = AESGCM.decrypt(enc_message, shared_key, gcm_iv, gcm_tag)
except cryptography.exceptions.InvalidTag:
raise ValueError('Malformed message: GCM decrypt failed')
# Unpack Inner header
end = header_len
ca_req_inner_header = data[:end]
(device_message_version, res1, res2, res3, inner_message_len) = struct.unpack(
'<4B I', ca_req_inner_header)
if device_message_version != _MESSAGE_VERSION:
raise ValueError('Malformed message: Incorrect inner message version')
if res1 or res2 or res3:
raise ValueError('Malformed message: Reserved values set')
remaining_bytes = len(ca_request) - header_len - pub_key_len
remaining_bytes = remaining_bytes - gcm_iv_len - gcm_tag_len
if inner_message_len > remaining_bytes:
raise ValueError('Malformed message: Incorrect device inner message length')
# SOM key certificate chain
som_chain_start = header_len
som_chain_len = _get_var_len(data, som_chain_start)
if som_chain_len > 0:
raise ValueError(
'SOM authentication not yet supported, set cert chain length to zero')
# SOM key authentication signature
som_key_start = som_chain_start + var_len + som_chain_len
som_len = _get_var_len(data, som_key_start)
if som_len > 0:
raise ValueError(
'SOM authentication not yet supported, set signature length to zero')
# Product ID SHA-256 hash
prod_id_start = som_key_start + var_len + som_len
prod_id_end = prod_id_start + prod_id_hash_len
prod_id_hash = data[prod_id_start:prod_id_end]
print 'product_id hash:' + prod_id_hash.encode('hex')
# RSA public key to certify
rsa_start = prod_id_start + prod_id_hash_len
rsa_len = _get_var_len(data, rsa_start)
if rsa_len > 0:
raise ValueError(
'Certify operation not supported, set RSA public key length to zero')
# ECDSA public key to certify
ecdsa_start = rsa_start + var_len + rsa_len
ecdsa_len = _get_var_len(data, ecdsa_start)
if ecdsa_len > 0:
raise ValueError(
'Certify operation not supported, set ECDSA public key length to zero')
# edDSA public key to certify
eddsa_start = prod_id_start + var_len + prod_id_hash_len
eddsa_len = _get_var_len(data, eddsa_start)
if eddsa_len > 0:
raise ValueError(
'Certify operation not supported, set edDSA public key length to zero')
# ATFA treats ISSUE and ISSUE_ENCRYPTED operations the same
if _session_params.operation == _OPERATIONS['ISSUE']:
with open('keysets/unencrypted.keyset', 'rb') as infile:
inner_ca_response = bytes(infile.read())
elif _session_params.operation == _OPERATIONS['ISSUE_ENC']:
with open('keysets/encrypted.keyset', 'rb') as infile:
inner_ca_response = bytes(infile.read())
(gcm_iv, encrypted_keyset, gcm_tag) = AESGCM.encrypt(inner_ca_response,
shared_key)
# "CA Response" Header
# +2 for algo and operation bytes
header = (_MESSAGE_VERSION, 0, 0, 0, 12 + 4 + len(encrypted_keyset) + 16)
ca_response = bytearray(struct.pack('<4B I', *header))
struct_fmt = '12s I %ds 16s' % len(inner_ca_response)
message = (gcm_iv, len(encrypted_keyset), encrypted_keyset, gcm_tag)
ca_response.extend(struct.pack(struct_fmt, *message))
with open('tmp/ca_response.bin', 'wb') as f:
f.write(ca_response)
def _get_shared_key(algorithm,
device_pub_key,
hkdf_salt,
hkdf_info='KEY',
hkdf_hash_len=16):
"""Generates the shared key based on ECDH and HKDF.
Uses a particular ECDH algorithm and HKDF-SHA256 to create a shared key
Args:
algorithm: p256 or curve25519
device_pub_key: ephemeral public key from the AT device
hkdf_salt: salt to use in the HKDF operation
hkdf_info: info value to use in the HKDF operation
hkdf_hash_len: length of the outputted hash value for use as a shared key
Raises:
RuntimeError: Computing the shared secret fails.
Returns:
The shared key.
"""
if algorithm == _ALGORITHMS['p256']:
ecdhe_shared_secret = ec_helper.compute_p256_shared_secret(
_session_params.private_key, device_pub_key)
elif algorithm == _ALGORITHMS['x25519']:
device_pub_key = device_pub_key[:-1]
ecdhe_shared_secret = curve25519.shared(_session_params.private_key,
device_pub_key)
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=hkdf_hash_len,
salt=hkdf_salt,
info=hkdf_info,
backend=default_backend())
shared_key = hkdf.derive(ecdhe_shared_secret)
return shared_key
def _get_var_len(data, index):
"""Reads the 4 byte little endian unsigned integer at data[index].
Args:
data: Start of bytearray
index: Offset that indicates where the integer begins
Returns:
Little endian unsigned integer at data[index]
"""
return struct.unpack('<I', data[index:index + 4])[0]
def main():
parser = argparse.ArgumentParser(
description='Test for Android Things key provisioning.')
parser.add_argument(
'-a',
'--algorithm',
type=str,
choices=['p256', 'x25519'],
required=True,
dest='algorithm',
help='Algorithm for deriving the ECDHE shared secret')
parser.add_argument(
'-s',
'--serial',
type=str,
required=True,
dest='serial',
help='Fastboot serial device',
metavar='FASTBOOT_SERIAL_NUMBER')
parser.add_argument(
'-o',
'--operation',
type=str,
default='ISSUE',
choices=['ISSUE', 'ISSUE_ENC'],
dest='operation',
help='Operation for provisioning the device')
results = parser.parse_args()
fastboot_device = results.serial
algorithm = _ALGORITHMS[results.algorithm]
operation = _OPERATIONS[results.operation]
_write_operation_start(algorithm, operation)
print 'Wrote Operation Start message to tmp/operation_start.bin'
os.system('fastboot -s %s stage tmp/operation_start.bin' % fastboot_device)
os.system('fastboot -s %s oem at-get-ca-request' % fastboot_device)
os.system('fastboot -s %s get_staged tmp/ca_request.bin' % fastboot_device)
with open('tmp/ca_request.bin', 'rb') as f:
ca_request = bytearray(f.read())
_get_ca_response(ca_request)
print 'Wrote CA Response message to tmp/ca_response.bin'
os.system('fastboot -s %s stage tmp/ca_response.bin' % fastboot_device)
os.system('fastboot -s %s oem at-set-ca-response' % fastboot_device)
os.system('fastboot -s %s getvar at-attest-uuid' % fastboot_device)
if __name__ == '__main__':
main()