普通文本  |  210行  |  6.8 KB

# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import json
import md5
import os
import requests

# ==================== Documents digests

def calculate_digest(doc):
    """
    Calculates digests for given document.

    @param doc: document's content

    @returns calculated digests as a string of hexadecimals

    """

    if ( doc[0:64].find(b'\x1B%-12345X@PJL') >= 0
            or doc[0:64].find('%!PS-Adobe') >= 0 ):
        # PJL or Postscript or PJL with encapsulated Postscript
        # Split by newline character and filter out problematic lines
        lines = doc.split('\n')
        for i, line in enumerate(lines):
            if ( line.startswith('@PJL SET ')
                    or line.startswith('@PJL COMMENT')
                    or line.startswith('@PJL JOB NAME')
                    or line.startswith('trailer << ')
                    or line.startswith('%%Title:')
                    or line.startswith('%%For:') ):
                lines[i] = ''
        doc = '\n'.join(lines)
    elif doc[0:8] == b'\x24\x01\x00\x00\x07\x00\x00\x00':
        # LIDIL
        LIDIL_JOBID_1_OFF = 2348 # first job id, offset from the beginning
        LIDIL_JOBID_2_OFF = 2339 # second job id, offset from the end
        nd = len(doc)
        if nd > LIDIL_JOBID_1_OFF + LIDIL_JOBID_2_OFF + 2:
            # remove the second JOB ID (at the end)
            doc = doc[:(nd-LIDIL_JOBID_2_OFF)] + doc[(nd-LIDIL_JOBID_2_OFF+2):]
            # remove the first JOB ID (at the beginning)
            doc = doc[:LIDIL_JOBID_1_OFF+1] + doc[LIDIL_JOBID_1_OFF+2:]
    # Calculates hash
    return md5.new(doc).hexdigest()


def parse_digests_file(path_digests, blacklist):
    """
    Parses digests from file.

    @param path_digests: a path to a file with digests
    @param blacklist: list of keys to omit

    @returns a dictionary with digests indexed by ppd filenames or an empty
            dictionary if the given file does not exist

    """
    digests = dict()
    blacklist = set(blacklist)
    if os.path.isfile(path_digests):
        with open(path_digests, 'rb') as file_digests:
            lines = file_digests.read().splitlines()
            for line in lines:
                cols = line.split()
                if len(cols) >= 2 and cols[0] not in blacklist:
                    digests[cols[0]] = cols[1]
    return digests


def save_digests_file(path_digests, digests, blacklist):
    """
    Saves list of digests to file.

    @param digests: dictionary with digests (keys are names)
    @param blacklist: list of keys to ignore

    @return a content of digests file

    """
    digests_content = ''
    names = sorted(set(digests.keys()).difference(blacklist))
    for name in names:
        digest = digests[name]
        assert name.find('\t') < 0 and name.find('\n') < 0
        assert digest.find('\t') < 0 and digest.find('\n') < 0
        digests_content += name + '\t' + digest + '\n'

    with open(path_digests, 'wb') as file_digests:
        file_digests.write(digests_content)


def load_blacklist(path_blacklist):
    """
    Loads blacklist of outputs to omit.

    Raw outputs generated by some PPD files cannot be verified by digests,
    because they contain variables like date/time, job id or other non-static
    parameters. This routine returns list of blacklisted ppds.

    @param path_blacklist: a path to the file with the list of blacklisted
            PPD files

    @returns a list of ppds to ignore during verification of digests

    """
    with open(path_blacklist) as file_blacklist:
        lines = file_blacklist.readlines()

    blacklist = []
    for entry in lines:
        entry = entry.strip()
        if entry != '':
            blacklist.append(entry)

    return blacklist


# ===================== PPD files on the SCS server

def get_filenames_from_PPD_index(task_id):
    """
    It downloads an index file from the SCS server and extracts names
    of PPD files from it.

    @param task_id: an order number of an index file to process; this is
            an integer from the interval [0..20)

    @returns a list of PPD filenames (may contain duplicates)

    """
    # calculates a URL of the index file
    url_metadata = 'https://www.gstatic.com/chromeos_printing/metadata_v2/'
    url_ppd_index = url_metadata + ('index-%02d.json' % task_id)
    # donwloads and parses the index file
    request = requests.get(url_ppd_index)
    entries = json.loads(request.content)
    # extracts PPD filenames (the second element in each index entry)
    output = []
    for entry in entries:
        output.append(entry[1])
    # returns a list of extracted filenames
    return output


def download_PPD_file(ppd_file):
    """
    It downloads a PPD file from the SCS server.

    @param ppd_file: a filename of PPD file (neither path nor URL)

    @returns content of the PPD file
    """
    url_ppds = 'https://www.gstatic.com/chromeos_printing/ppds/'
    request = requests.get(url_ppds + ppd_file)
    return request.content


# ==================== Local filesystem

def list_entries_from_directory(
        path,
        with_suffixes=None, nonempty_results=False,
        include_files=True, include_directories=True ):
    """
    It returns all filenames from given directory. Results may be filtered
    by filenames suffixes or entries types.

    @param path: a path to directory to list files from
    @param with_suffixes: if set, only entries with given suffixes are
            returned; it must be a tuple
    @param nonempty_results: if True then Exception is raised if there is no
            results
    @param include_files: if False, then regular files and links are omitted
    @param include_directories: if False, directories are omitted

    @returns a nonempty list of entries meeting given criteria

    @raises Exception if no matching filenames were found and
            nonempty_results is set to True

    """
    # lists all files from the directory and filter them by given criteria
    list_of_files = []
    for filename in os.listdir(path):
        path_entry = os.path.join(path, filename)
        # check type
        if os.path.isfile(path_entry):
            if not include_files:
                continue
        elif os.path.isdir(path_entry):
            if not include_directories:
                continue
        else:
            continue
        # check suffix
        if with_suffixes is not None:
            if not filename.endswith(with_suffixes):
                continue
        list_of_files.append(filename)
    # throws exception if no files were found
    if nonempty_results and len(list_of_files) == 0:
        message = 'Directory %s does not contain any ' % path
        message += 'entries meeting the criteria'
        raise Exception(message)
    # returns a non-empty list
    return list_of_files