普通文本  |  697行  |  23.33 KB

#!/usr/bin/python
"""
Cartesian configuration format file parser.

 Filter syntax:
 , means OR
 .. means AND
 . means IMMEDIATELY-FOLLOWED-BY

 Example:
 qcow2..Fedora.14, RHEL.6..raw..boot, smp2..qcow2..migrate..ide
 means match all dicts whose names have:
 (qcow2 AND (Fedora IMMEDIATELY-FOLLOWED-BY 14)) OR
 ((RHEL IMMEDIATELY-FOLLOWED-BY 6) AND raw AND boot) OR
 (smp2 AND qcow2 AND migrate AND ide)

 Note:
 'qcow2..Fedora.14' is equivalent to 'Fedora.14..qcow2'.
 'qcow2..Fedora.14' is not equivalent to 'qcow2..14.Fedora'.
 'ide, scsi' is equivalent to 'scsi, ide'.

 Filters can be used in 3 ways:
 only <filter>
 no <filter>
 <filter>:
 The last one starts a conditional block.

@copyright: Red Hat 2008-2011
"""

import re, os, sys, optparse, collections

class ParserError:
    def __init__(self, msg, line=None, filename=None, linenum=None):
        self.msg = msg
        self.line = line
        self.filename = filename
        self.linenum = linenum

    def __str__(self):
        if self.line:
            return "%s: %r (%s:%s)" % (self.msg, self.line,
                                       self.filename, self.linenum)
        else:
            return "%s (%s:%s)" % (self.msg, self.filename, self.linenum)


num_failed_cases = 5


class Node(object):
    def __init__(self):
        self.name = []
        self.dep = []
        self.content = []
        self.children = []
        self.labels = set()
        self.append_to_shortname = False
        self.failed_cases = collections.deque()


def _match_adjacent(block, ctx, ctx_set):
    # TODO: explain what this function does
    if block[0] not in ctx_set:
        return 0
    if len(block) == 1:
        return 1
    if block[1] not in ctx_set:
        return int(ctx[-1] == block[0])
    k = 0
    i = ctx.index(block[0])
    while i < len(ctx):
        if k > 0 and ctx[i] != block[k]:
            i -= k - 1
            k = 0
        if ctx[i] == block[k]:
            k += 1
            if k >= len(block):
                break
            if block[k] not in ctx_set:
                break
        i += 1
    return k


def _might_match_adjacent(block, ctx, ctx_set, descendant_labels):
    matched = _match_adjacent(block, ctx, ctx_set)
    for elem in block[matched:]:
        if elem not in descendant_labels:
            return False
    return True


# Filter must inherit from object (otherwise type() won't work)
class Filter(object):
    def __init__(self, s):
        self.filter = []
        for char in s:
            if not (char.isalnum() or char.isspace() or char in ".,_-"):
                raise ParserError("Illegal characters in filter")
        for word in s.replace(",", " ").split():
            word = [block.split(".") for block in word.split("..")]
            for block in word:
                for elem in block:
                    if not elem:
                        raise ParserError("Syntax error")
            self.filter += [word]


    def match(self, ctx, ctx_set):
        for word in self.filter:
            for block in word:
                if _match_adjacent(block, ctx, ctx_set) != len(block):
                    break
            else:
                return True
        return False


    def might_match(self, ctx, ctx_set, descendant_labels):
        for word in self.filter:
            for block in word:
                if not _might_match_adjacent(block, ctx, ctx_set,
                                             descendant_labels):
                    break
            else:
                return True
        return False


class NoOnlyFilter(Filter):
    def __init__(self, line):
        Filter.__init__(self, line.split(None, 1)[1])
        self.line = line


class OnlyFilter(NoOnlyFilter):
    def is_irrelevant(self, ctx, ctx_set, descendant_labels):
        return self.match(ctx, ctx_set)


    def requires_action(self, ctx, ctx_set, descendant_labels):
        return not self.might_match(ctx, ctx_set, descendant_labels)


    def might_pass(self, failed_ctx, failed_ctx_set, ctx, ctx_set,
                   descendant_labels):
        for word in self.filter:
            for block in word:
                if (_match_adjacent(block, ctx, ctx_set) >
                    _match_adjacent(block, failed_ctx, failed_ctx_set)):
                    return self.might_match(ctx, ctx_set, descendant_labels)
        return False


class NoFilter(NoOnlyFilter):
    def is_irrelevant(self, ctx, ctx_set, descendant_labels):
        return not self.might_match(ctx, ctx_set, descendant_labels)


    def requires_action(self, ctx, ctx_set, descendant_labels):
        return self.match(ctx, ctx_set)


    def might_pass(self, failed_ctx, failed_ctx_set, ctx, ctx_set,
                   descendant_labels):
        for word in self.filter:
            for block in word:
                if (_match_adjacent(block, ctx, ctx_set) <
                    _match_adjacent(block, failed_ctx, failed_ctx_set)):
                    return not self.match(ctx, ctx_set)
        return False


class Condition(NoFilter):
    def __init__(self, line):
        Filter.__init__(self, line.rstrip(":"))
        self.line = line
        self.content = []


class NegativeCondition(OnlyFilter):
    def __init__(self, line):
        Filter.__init__(self, line.lstrip("!").rstrip(":"))
        self.line = line
        self.content = []


class Parser(object):
    """
    Parse an input file or string that follows the Cartesian Config File format
    and generate a list of dicts that will be later used as configuration
    parameters by autotest tests that use that format.

    @see: http://autotest.kernel.org/wiki/CartesianConfig
    """

    def __init__(self, filename=None, debug=False):
        """
        Initialize the parser and optionally parse a file.

        @param filename: Path of the file to parse.
        @param debug: Whether to turn on debugging output.
        """
        self.node = Node()
        self.debug = debug
        if filename:
            self.parse_file(filename)


    def parse_file(self, filename):
        """
        Parse a file.

        @param filename: Path of the configuration file.
        """
        self.node = self._parse(FileReader(filename), self.node)


    def parse_string(self, s):
        """
        Parse a string.

        @param s: String to parse.
        """
        self.node = self._parse(StrReader(s), self.node)


    def get_dicts(self, node=None, ctx=[], content=[], shortname=[], dep=[]):
        """
        Generate dictionaries from the code parsed so far.  This should
        be called after parsing something.

        @return: A dict generator.
        """
        def process_content(content, failed_filters):
            # 1. Check that the filters in content are OK with the current
            #    context (ctx).
            # 2. Move the parts of content that are still relevant into
            #    new_content and unpack conditional blocks if appropriate.
            #    For example, if an 'only' statement fully matches ctx, it
            #    becomes irrelevant and is not appended to new_content.
            #    If a conditional block fully matches, its contents are
            #    unpacked into new_content.
            # 3. Move failed filters into failed_filters, so that next time we
            #    reach this node or one of its ancestors, we'll check those
            #    filters first.
            for t in content:
                filename, linenum, obj = t
                if type(obj) is Op:
                    new_content.append(t)
                    continue
                # obj is an OnlyFilter/NoFilter/Condition/NegativeCondition
                if obj.requires_action(ctx, ctx_set, labels):
                    # This filter requires action now
                    if type(obj) is OnlyFilter or type(obj) is NoFilter:
                        self._debug("    filter did not pass: %r (%s:%s)",
                                    obj.line, filename, linenum)
                        failed_filters.append(t)
                        return False
                    else:
                        self._debug("    conditional block matches: %r (%s:%s)",
                                    obj.line, filename, linenum)
                        # Check and unpack the content inside this Condition
                        # object (note: the failed filters should go into
                        # new_internal_filters because we don't expect them to
                        # come from outside this node, even if the Condition
                        # itself was external)
                        if not process_content(obj.content,
                                               new_internal_filters):
                            failed_filters.append(t)
                            return False
                        continue
                elif obj.is_irrelevant(ctx, ctx_set, labels):
                    # This filter is no longer relevant and can be removed
                    continue
                else:
                    # Keep the filter and check it again later
                    new_content.append(t)
            return True

        def might_pass(failed_ctx,
                       failed_ctx_set,
                       failed_external_filters,
                       failed_internal_filters):
            for t in failed_external_filters:
                if t not in content:
                    return True
                filename, linenum, filter = t
                if filter.might_pass(failed_ctx, failed_ctx_set, ctx, ctx_set,
                                     labels):
                    return True
            for t in failed_internal_filters:
                filename, linenum, filter = t
                if filter.might_pass(failed_ctx, failed_ctx_set, ctx, ctx_set,
                                     labels):
                    return True
            return False

        def add_failed_case():
            node.failed_cases.appendleft((ctx, ctx_set,
                                          new_external_filters,
                                          new_internal_filters))
            if len(node.failed_cases) > num_failed_cases:
                node.failed_cases.pop()

        node = node or self.node
        # Update dep
        for d in node.dep:
            dep = dep + [".".join(ctx + [d])]
        # Update ctx
        ctx = ctx + node.name
        ctx_set = set(ctx)
        labels = node.labels
        # Get the current name
        name = ".".join(ctx)
        if node.name:
            self._debug("checking out %r", name)
        # Check previously failed filters
        for i, failed_case in enumerate(node.failed_cases):
            if not might_pass(*failed_case):
                self._debug("    this subtree has failed before")
                del node.failed_cases[i]
                node.failed_cases.appendleft(failed_case)
                return
        # Check content and unpack it into new_content
        new_content = []
        new_external_filters = []
        new_internal_filters = []
        if (not process_content(node.content, new_internal_filters) or
            not process_content(content, new_external_filters)):
            add_failed_case()
            return
        # Update shortname
        if node.append_to_shortname:
            shortname = shortname + node.name
        # Recurse into children
        count = 0
        for n in node.children:
            for d in self.get_dicts(n, ctx, new_content, shortname, dep):
                count += 1
                yield d
        # Reached leaf?
        if not node.children:
            self._debug("    reached leaf, returning it")
            d = {"name": name, "dep": dep, "shortname": ".".join(shortname)}
            for filename, linenum, op in new_content:
                op.apply_to_dict(d)
            yield d
        # If this node did not produce any dicts, remember the failed filters
        # of its descendants
        elif not count:
            new_external_filters = []
            new_internal_filters = []
            for n in node.children:
                (failed_ctx,
                 failed_ctx_set,
                 failed_external_filters,
                 failed_internal_filters) = n.failed_cases[0]
                for obj in failed_internal_filters:
                    if obj not in new_internal_filters:
                        new_internal_filters.append(obj)
                for obj in failed_external_filters:
                    if obj in content:
                        if obj not in new_external_filters:
                            new_external_filters.append(obj)
                    else:
                        if obj not in new_internal_filters:
                            new_internal_filters.append(obj)
            add_failed_case()


    def _debug(self, s, *args):
        if self.debug:
            s = "DEBUG: %s" % s
            print s % args


    def _warn(self, s, *args):
        s = "WARNING: %s" % s
        print s % args


    def _parse_variants(self, cr, node, prev_indent=-1):
        """
        Read and parse lines from a FileReader object until a line with an
        indent level lower than or equal to prev_indent is encountered.

        @param cr: A FileReader/StrReader object.
        @param node: A node to operate on.
        @param prev_indent: The indent level of the "parent" block.
        @return: A node object.
        """
        node4 = Node()

        while True:
            line, indent, linenum = cr.get_next_line(prev_indent)
            if not line:
                break

            name, dep = map(str.strip, line.lstrip("- ").split(":", 1))
            for char in name:
                if not (char.isalnum() or char in "@._-"):
                    raise ParserError("Illegal characters in variant name",
                                      line, cr.filename, linenum)
            for char in dep:
                if not (char.isalnum() or char.isspace() or char in ".,_-"):
                    raise ParserError("Illegal characters in dependencies",
                                      line, cr.filename, linenum)

            node2 = Node()
            node2.children = [node]
            node2.labels = node.labels

            node3 = self._parse(cr, node2, prev_indent=indent)
            node3.name = name.lstrip("@").split(".")
            node3.dep = dep.replace(",", " ").split()
            node3.append_to_shortname = not name.startswith("@")

            node4.children += [node3]
            node4.labels.update(node3.labels)
            node4.labels.update(node3.name)

        return node4


    def _parse(self, cr, node, prev_indent=-1):
        """
        Read and parse lines from a StrReader object until a line with an
        indent level lower than or equal to prev_indent is encountered.

        @param cr: A FileReader/StrReader object.
        @param node: A Node or a Condition object to operate on.
        @param prev_indent: The indent level of the "parent" block.
        @return: A node object.
        """
        while True:
            line, indent, linenum = cr.get_next_line(prev_indent)
            if not line:
                break

            words = line.split(None, 1)

            # Parse 'variants'
            if line == "variants:":
                # 'variants' is not allowed inside a conditional block
                if (isinstance(node, Condition) or
                    isinstance(node, NegativeCondition)):
                    raise ParserError("'variants' is not allowed inside a "
                                      "conditional block",
                                      None, cr.filename, linenum)
                node = self._parse_variants(cr, node, prev_indent=indent)
                continue

            # Parse 'include' statements
            if words[0] == "include":
                if len(words) < 2:
                    raise ParserError("Syntax error: missing parameter",
                                      line, cr.filename, linenum)
                filename = os.path.expanduser(words[1])
                if isinstance(cr, FileReader) and not os.path.isabs(filename):
                    filename = os.path.join(os.path.dirname(cr.filename),
                                            filename)
                if not os.path.isfile(filename):
                    self._warn("%r (%s:%s): file doesn't exist or is not a "
                               "regular file", line, cr.filename, linenum)
                    continue
                node = self._parse(FileReader(filename), node)
                continue

            # Parse 'only' and 'no' filters
            if words[0] in ("only", "no"):
                if len(words) < 2:
                    raise ParserError("Syntax error: missing parameter",
                                      line, cr.filename, linenum)
                try:
                    if words[0] == "only":
                        f = OnlyFilter(line)
                    elif words[0] == "no":
                        f = NoFilter(line)
                except ParserError, e:
                    e.line = line
                    e.filename = cr.filename
                    e.linenum = linenum
                    raise
                node.content += [(cr.filename, linenum, f)]
                continue

            # Look for operators
            op_match = _ops_exp.search(line)

            # Parse conditional blocks
            if ":" in line:
                index = line.index(":")
                if not op_match or index < op_match.start():
                    index += 1
                    cr.set_next_line(line[index:], indent, linenum)
                    line = line[:index]
                    try:
                        if line.startswith("!"):
                            cond = NegativeCondition(line)
                        else:
                            cond = Condition(line)
                    except ParserError, e:
                        e.line = line
                        e.filename = cr.filename
                        e.linenum = linenum
                        raise
                    self._parse(cr, cond, prev_indent=indent)
                    node.content += [(cr.filename, linenum, cond)]
                    continue

            # Parse regular operators
            if not op_match:
                raise ParserError("Syntax error", line, cr.filename, linenum)
            node.content += [(cr.filename, linenum, Op(line, op_match))]

        return node


# Assignment operators

_reserved_keys = set(("name", "shortname", "dep"))


def _op_set(d, key, value):
    if key not in _reserved_keys:
        d[key] = value


def _op_append(d, key, value):
    if key not in _reserved_keys:
        d[key] = d.get(key, "") + value


def _op_prepend(d, key, value):
    if key not in _reserved_keys:
        d[key] = value + d.get(key, "")


def _op_regex_set(d, exp, value):
    exp = re.compile("%s$" % exp)
    for key in d:
        if key not in _reserved_keys and exp.match(key):
            d[key] = value


def _op_regex_append(d, exp, value):
    exp = re.compile("%s$" % exp)
    for key in d:
        if key not in _reserved_keys and exp.match(key):
            d[key] += value


def _op_regex_prepend(d, exp, value):
    exp = re.compile("%s$" % exp)
    for key in d:
        if key not in _reserved_keys and exp.match(key):
            d[key] = value + d[key]


def _op_regex_del(d, empty, exp):
    exp = re.compile("%s$" % exp)
    for key in d.keys():
        if key not in _reserved_keys and exp.match(key):
            del d[key]


_ops = {"=": (r"\=", _op_set),
        "+=": (r"\+\=", _op_append),
        "<=": (r"\<\=", _op_prepend),
        "?=": (r"\?\=", _op_regex_set),
        "?+=": (r"\?\+\=", _op_regex_append),
        "?<=": (r"\?\<\=", _op_regex_prepend),
        "del": (r"^del\b", _op_regex_del)}

_ops_exp = re.compile("|".join([op[0] for op in _ops.values()]))


class Op(object):
    def __init__(self, line, m):
        self.func = _ops[m.group()][1]
        self.key = line[:m.start()].strip()
        value = line[m.end():].strip()
        if value and (value[0] == value[-1] == '"' or
                      value[0] == value[-1] == "'"):
            value = value[1:-1]
        self.value = value


    def apply_to_dict(self, d):
        self.func(d, self.key, self.value)


# StrReader and FileReader

class StrReader(object):
    """
    Preprocess an input string for easy reading.
    """
    def __init__(self, s):
        """
        Initialize the reader.

        @param s: The string to parse.
        """
        self.filename = "<string>"
        self._lines = []
        self._line_index = 0
        self._stored_line = None
        for linenum, line in enumerate(s.splitlines()):
            line = line.rstrip().expandtabs()
            stripped_line = line.lstrip()
            indent = len(line) - len(stripped_line)
            if (not stripped_line
                or stripped_line.startswith("#")
                or stripped_line.startswith("//")):
                continue
            self._lines.append((stripped_line, indent, linenum + 1))


    def get_next_line(self, prev_indent):
        """
        Get the next line in the current block.

        @param prev_indent: The indentation level of the previous block.
        @return: (line, indent, linenum), where indent is the line's
            indentation level.  If no line is available, (None, -1, -1) is
            returned.
        """
        if self._stored_line:
            ret = self._stored_line
            self._stored_line = None
            return ret
        if self._line_index >= len(self._lines):
            return None, -1, -1
        line, indent, linenum = self._lines[self._line_index]
        if indent <= prev_indent:
            return None, -1, -1
        self._line_index += 1
        return line, indent, linenum


    def set_next_line(self, line, indent, linenum):
        """
        Make the next call to get_next_line() return the given line instead of
        the real next line.
        """
        line = line.strip()
        if line:
            self._stored_line = line, indent, linenum


class FileReader(StrReader):
    """
    Preprocess an input file for easy reading.
    """
    def __init__(self, filename):
        """
        Initialize the reader.

        @parse filename: The name of the input file.
        """
        StrReader.__init__(self, open(filename).read())
        self.filename = filename


if __name__ == "__main__":
    parser = optparse.OptionParser('usage: %prog [options] filename '
                                   '[extra code] ...\n\nExample:\n\n    '
                                   '%prog tests.cfg "only my_set" "no qcow2"')
    parser.add_option("-v", "--verbose", dest="debug", action="store_true",
                      help="include debug messages in console output")
    parser.add_option("-f", "--fullname", dest="fullname", action="store_true",
                      help="show full dict names instead of short names")
    parser.add_option("-c", "--contents", dest="contents", action="store_true",
                      help="show dict contents")

    options, args = parser.parse_args()
    if not args:
        parser.error("filename required")

    c = Parser(args[0], debug=options.debug)
    for s in args[1:]:
        c.parse_string(s)

    for i, d in enumerate(c.get_dicts()):
        if options.fullname:
            print "dict %4d:  %s" % (i + 1, d["name"])
        else:
            print "dict %4d:  %s" % (i + 1, d["shortname"])
        if options.contents:
            keys = d.keys()
            keys.sort()
            for key in keys:
                print "    %s = %s" % (key, d[key])