#!/usr/bin/env python
# Copyright (c) 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""
This utility takes a JSON input that describes a CRLSet and produces a
CRLSet from it.
The input is taken on stdin and is a dict with the following keys:
- BlockedBySPKI: An array of strings, where each string is a filename
containing a PEM certificate, from which an SPKI will be extracted.
- BlockedByHash: A dict of string to an array of ints, where the string is
a filename containing a PEM format certificate, and the ints are the
serial numbers. The listed serial numbers will be blocked when issued by
the given certificate.
For example:
{
"BlockedBySPKI": ["/tmp/blocked-certificate"],
"BlockedByHash": {
"/tmp/intermediate-certificate": [1, 2, 3]
}
}
"""
import hashlib
import json
import optparse
import struct
import sys
def _pem_cert_to_binary(pem_filename):
"""Decodes the first PEM-encoded certificate in a given file into binary
Args:
pem_filename: A filename that contains a PEM-encoded certificate. It may
contain additional data (keys, textual representation) which will be
ignored
Returns:
A byte array containing the decoded certificate data
"""
base64 = ""
started = False
with open(pem_filename, 'r') as pem_file:
for line in pem_file:
if not started:
if line.startswith('-----BEGIN CERTIFICATE'):
started = True
else:
if line.startswith('-----END CERTIFICATE'):
break
base64 += line[:-1].strip()
return base64.decode('base64')
def _parse_asn1_element(der_bytes):
"""Parses a DER-encoded tag/Length/Value into its component parts
Args:
der_bytes: A DER-encoded ASN.1 data type
Returns:
A tuple of the ASN.1 tag value, the length of the ASN.1 header that was
read, the sequence of bytes for the value, and then any data from der_bytes
that was not part of the tag/Length/Value.
"""
tag = ord(der_bytes[0])
length = ord(der_bytes[1])
header_length = 2
if length & 0x80:
num_length_bytes = length & 0x7f
length = 0
for i in xrange(2, 2 + num_length_bytes):
length <<= 8
length += ord(der_bytes[i])
header_length = 2 + num_length_bytes
contents = der_bytes[:header_length + length]
rest = der_bytes[header_length + length:]
return (tag, header_length, contents, rest)
class ASN1Iterator(object):
"""Iterator that parses and iterates through a ASN.1 DER structure"""
def __init__(self, contents):
self._tag = 0
self._header_length = 0
self._rest = None
self._contents = contents
self.step_into()
def step_into(self):
"""Begins processing the inner contents of the next ASN.1 element"""
(self._tag, self._header_length, self._contents, self._rest) = (
_parse_asn1_element(self._contents[self._header_length:]))
def step_over(self):
"""Skips/ignores the next ASN.1 element"""
(self._tag, self._header_length, self._contents, self._rest) = (
_parse_asn1_element(self._rest))
def tag(self):
"""Returns the ASN.1 tag of the current element"""
return self._tag
def contents(self):
"""Returns the raw data of the current element"""
return self._contents
def _der_cert_to_spki(der_bytes):
"""Returns the subjectPublicKeyInfo of a DER-encoded certificate
Args:
der_bytes: A DER-encoded certificate (RFC 5280)
Returns:
A byte array containing the subjectPublicKeyInfo
"""
iterator = ASN1Iterator(der_bytes)
iterator.step_into() # enter certificate structure
iterator.step_into() # enter TBSCertificate
iterator.step_over() # over version
iterator.step_over() # over serial
iterator.step_over() # over signature algorithm
iterator.step_over() # over issuer name
iterator.step_over() # over validity
iterator.step_over() # over subject name
return iterator.contents()
def pem_cert_file_to_spki_hash(pem_filename):
"""Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file
Args:
pem_filename: A file containing a PEM-encoded certificate.
Returns:
The SHA-256 hash of the first certificate in the file, as a byte sequence
"""
return hashlib.sha256(
_der_cert_to_spki(_pem_cert_to_binary(pem_filename))).digest()
def main():
parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
parser.add_option('-o', '--output',
help='Specifies the output file. The default is stdout.')
options, _ = parser.parse_args()
outfile = sys.stdout
if options.output and options.output != '-':
outfile = open(options.output, 'wb')
config = json.load(sys.stdin)
blocked_spkis = [
pem_cert_file_to_spki_hash(pem_file).encode('base64').strip()
for pem_file in config.get('BlockedBySPKI', [])]
parents = {
pem_cert_file_to_spki_hash(pem_file): serials
for pem_file, serials in config.get('BlockedByHash', {}).iteritems()
}
header_json = {
'Version': 0,
'ContentType': 'CRLSet',
'Sequence': 0,
'DeltaFrom': 0,
'NumParents': len(parents),
'BlockedSPKIs': blocked_spkis,
}
header = json.dumps(header_json)
outfile.write(struct.pack('<H', len(header)))
outfile.write(header)
for spki, serials in sorted(parents.iteritems()):
outfile.write(spki)
outfile.write(struct.pack('<I', len(serials)))
for serial in serials:
raw_serial = []
if not serial:
raw_serial = ['\x00']
else:
while serial:
raw_serial.insert(0, chr(serial & 0xff))
serial >>= 8
outfile.write(struct.pack('<B', len(raw_serial)))
outfile.write(''.join(raw_serial))
return 0
if __name__ == '__main__':
sys.exit(main())