普通文本  |  201行  |  5.72 KB

#!/usr/bin/env python
# Copyright (c) 2014 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""
This utility takes a JSON input that describes a CRLSet and produces a
CRLSet from it.

The input is taken on stdin and is a dict with the following keys:
  - BlockedBySPKI: An array of strings, where each string is a filename
      containing a PEM certificate, from which an SPKI will be extracted.
  - BlockedByHash: A dict of string to an array of ints, where the string is
      a filename containing a PEM format certificate, and the ints are the
      serial numbers. The listed serial numbers will be blocked when issued by
      the given certificate.

For example:

{
  "BlockedBySPKI": ["/tmp/blocked-certificate"],
  "BlockedByHash": {
    "/tmp/intermediate-certificate": [1, 2, 3]
  }
}
"""

import hashlib
import json
import optparse
import struct
import sys


def _pem_cert_to_binary(pem_filename):
  """Decodes the first PEM-encoded certificate in a given file into binary

  Args:
    pem_filename: A filename that contains a PEM-encoded certificate. It may
        contain additional data (keys, textual representation) which will be
        ignored

  Returns:
    A byte array containing the decoded certificate data
  """
  base64 = ""
  started = False

  with open(pem_filename, 'r') as pem_file:
    for line in pem_file:
      if not started:
        if line.startswith('-----BEGIN CERTIFICATE'):
          started = True
      else:
        if line.startswith('-----END CERTIFICATE'):
          break
        base64 += line[:-1].strip()

  return base64.decode('base64')


def _parse_asn1_element(der_bytes):
  """Parses a DER-encoded tag/Length/Value into its component parts

  Args:
    der_bytes: A DER-encoded ASN.1 data type

  Returns:
    A tuple of the ASN.1 tag value, the length of the ASN.1 header that was
    read, the sequence of bytes for the value, and then any data from der_bytes
    that was not part of the tag/Length/Value.
  """
  tag = ord(der_bytes[0])
  length = ord(der_bytes[1])
  header_length = 2

  if length & 0x80:
    num_length_bytes = length & 0x7f
    length = 0
    for i in xrange(2, 2 + num_length_bytes):
      length <<= 8
      length += ord(der_bytes[i])
    header_length = 2 + num_length_bytes

  contents = der_bytes[:header_length + length]
  rest = der_bytes[header_length + length:]

  return (tag, header_length, contents, rest)


class ASN1Iterator(object):
  """Iterator that parses and iterates through a ASN.1 DER structure"""

  def __init__(self, contents):
    self._tag = 0
    self._header_length = 0
    self._rest = None
    self._contents = contents
    self.step_into()

  def step_into(self):
    """Begins processing the inner contents of the next ASN.1 element"""
    (self._tag, self._header_length, self._contents, self._rest) = (
        _parse_asn1_element(self._contents[self._header_length:]))

  def step_over(self):
    """Skips/ignores the next ASN.1 element"""
    (self._tag, self._header_length, self._contents, self._rest) = (
        _parse_asn1_element(self._rest))

  def tag(self):
    """Returns the ASN.1 tag of the current element"""
    return self._tag

  def contents(self):
    """Returns the raw data of the current element"""
    return self._contents


def _der_cert_to_spki(der_bytes):
  """Returns the subjectPublicKeyInfo of a DER-encoded certificate

  Args:
    der_bytes: A DER-encoded certificate (RFC 5280)

  Returns:
    A byte array containing the subjectPublicKeyInfo
  """
  iterator = ASN1Iterator(der_bytes)
  iterator.step_into()  # enter certificate structure
  iterator.step_into()  # enter TBSCertificate
  iterator.step_over()  # over version
  iterator.step_over()  # over serial
  iterator.step_over()  # over signature algorithm
  iterator.step_over()  # over issuer name
  iterator.step_over()  # over validity
  iterator.step_over()  # over subject name
  return iterator.contents()


def pem_cert_file_to_spki_hash(pem_filename):
  """Gets the SHA-256 hash of the subjectPublicKeyInfo of a cert in a file

  Args:
    pem_filename: A file containing a PEM-encoded certificate.

  Returns:
    The SHA-256 hash of the first certificate in the file, as a byte sequence
  """
  return hashlib.sha256(
    _der_cert_to_spki(_pem_cert_to_binary(pem_filename))).digest()


def main():
  parser = optparse.OptionParser(description=sys.modules[__name__].__doc__)
  parser.add_option('-o', '--output',
                    help='Specifies the output file. The default is stdout.')
  options, _ = parser.parse_args()
  outfile = sys.stdout
  if options.output and options.output != '-':
    outfile = open(options.output, 'wb')

  config = json.load(sys.stdin)
  blocked_spkis = [
      pem_cert_file_to_spki_hash(pem_file).encode('base64').strip()
      for pem_file in config.get('BlockedBySPKI', [])]
  parents = {
    pem_cert_file_to_spki_hash(pem_file): serials
    for pem_file, serials in config.get('BlockedByHash', {}).iteritems()
  }
  header_json = {
      'Version': 0,
      'ContentType': 'CRLSet',
      'Sequence': 0,
      'DeltaFrom': 0,
      'NumParents': len(parents),
      'BlockedSPKIs': blocked_spkis,
  }
  header = json.dumps(header_json)
  outfile.write(struct.pack('<H', len(header)))
  outfile.write(header)
  for spki, serials in sorted(parents.iteritems()):
    outfile.write(spki)
    outfile.write(struct.pack('<I', len(serials)))
    for serial in serials:
      raw_serial = []
      if not serial:
        raw_serial = ['\x00']
      else:
        while serial:
          raw_serial.insert(0, chr(serial & 0xff))
          serial >>= 8

    outfile.write(struct.pack('<B', len(raw_serial)))
    outfile.write(''.join(raw_serial))
  return 0


if __name__ == '__main__':
  sys.exit(main())