普通文本  |  403行  |  13.88 KB

#!/usr/bin/python
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse, math, re, sys
import xml.etree.ElementTree as ET
from collections import defaultdict, namedtuple
import itertools


def createLookup(values, key):
  """Creates a lookup table for a collection of values based on keys.

  Arguments:
    values: a collection of arbitrary values. Must be iterable.
    key: a function of one argument that returns the key for a value.

  Returns:
    A dict mapping keys (as generated by the key argument) to lists of
    values. All values in the lists have the same key, and are in the order
    they appeared in the collection.
  """
  lookup = defaultdict(list)
  for v in values:
    lookup[key(v)].append(v)
  return lookup


def _intify(value):
  """Returns a value converted to int if possible, else the original value."""
  try:
    return int(value)
  except ValueError:
    return value


class Size(namedtuple('Size', ['width', 'height'])):
  """A namedtuple with width and height fields."""
  def __str__(self):
    return '%dx%d' % (self.width, self.height)


class _VideoResultBase(object):
  """Helper methods for results. Not for use by applications.

  Attributes:
    codec: The name of the codec (string) or None
    size: Size representing the video size or None
    mime: The mime-type of the codec (string) or None
    rates: The measured achievable frame rates
    is_decoder: True iff codec is a decoder.
  """

  def __init__(self, is_decoder):
    self.codec = None
    self.mime = None
    self.size = None
    self._rates_from_failure = []
    self._rates_from_message = []
    self.is_decoder = is_decoder

  def _inited(self):
    """Returns true iff codec, mime and size was set."""
    return None not in (self.codec, self.mime, self.size)

  def __len__(self):
    # don't report any result if codec name, mime type and size is unclear
    if not self._inited():
      return 0
    return len(self.rates)

  @property
  def rates(self):
    return self._rates_from_failure or self._rates_from_message

  def _parseDict(self, value):
    """Parses a MediaFormat from its string representation sans brackets."""
    return dict((k, _intify(v))
                for k, v in re.findall(r'([^ =]+)=([^ [=]+(?:|\[[^\]]+\]))(?:, |$)', value))

  def _cleanFormat(self, format):
    """Removes internal fields from a parsed MediaFormat."""
    format.pop('what', None)
    format.pop('image-data', None)

  MESSAGE_PATTERN = r'(?P<key>\w+)=(?P<value>\{[^}]*\}|[^ ,{}]+)'

  def _parsePartialResult(self, message_match):
    """Parses a partial test result conforming to the message pattern.

    Returns:
      A tuple of string key and int, string or dict value, where dict has
      string keys mapping to int or string values.
    """
    key, value = message_match.group('key', 'value')
    if value.startswith('{'):
      value = self._parseDict(value[1:-1])
      if key.endswith('Format'):
        self._cleanFormat(value)
    else:
      value = _intify(value)
    return key, value

  def _parseValuesFromBracket(self, line):
    """Returns the values enclosed in brackets without the brackets.

    Parses a line matching the pattern "<tag>: [<values>]" and returns <values>.

    Raises:
      ValueError: if the line does not match the pattern.
    """
    try:
      return re.match(r'^[^:]+: *\[(?P<values>.*)\]\.$', line).group('values')
    except AttributeError:
      raise ValueError('line does not match "tag: [value]": %s' % line)

  def _parseRawData(self, line):
    """Parses the raw data line for video performance tests.

    Yields:
      Dict objects corresponding to parsed results, mapping string keys to
      int, string or dict values.
    """
    try:
      values = self._parseValuesFromBracket(line)
      result = {}
      for m in re.finditer(self.MESSAGE_PATTERN + r'(?P<sep>,? +|$)', values):
        key, value = self._parsePartialResult(m)
        result[key] = value
        if m.group('sep') != ' ':
          yield result
          result = {}
    except ValueError:
      print >> sys.stderr, 'could not parse line %s' % repr(line)

  def _tryParseMeasuredFrameRate(self, line):
    """Parses a line starting with 'Measured frame rate:'."""
    if line.startswith('Measured frame rate: '):
      try:
        values = self._parseValuesFromBracket(line)
        values = re.split(r' *, *', values)
        self._rates_from_failure = list(map(float, values))
      except ValueError:
        print >> sys.stderr, 'could not parse line %s' % repr(line)

  def parse(self, test):
    """Parses the ValueArray and FailedScene lines of a test result.

    Arguments:
      test: An ElementTree <Test> element.
    """
    failure = test.find('FailedScene')
    if failure is not None:
      trace = failure.find('StackTrace')
      if trace is not None:
        for line in re.split(r'[\r\n]+', trace.text):
          self._parseFailureLine(line)
    details = test.find('Details')
    if details is not None:
      for array in details.iter('ValueArray'):
        message = array.get('message')
        self._parseMessage(message, array)

  def _parseFailureLine(self, line):
    raise NotImplementedError

  def _parseMessage(self, message, array):
    raise NotImplementedError

  def getData(self):
    """Gets the parsed test result data.

    Yields:
       Result objects containing at least codec, size, mime and rates attributes."""
    yield self


class VideoEncoderDecoderTestResult(_VideoResultBase):
  """Represents a result from a VideoEncoderDecoderTest performance case."""

  def __init__(self, unused_m):
    super(VideoEncoderDecoderTestResult, self).__init__(is_decoder=False)

  # If a VideoEncoderDecoderTest succeeds, it provides the results in the
  # message of a ValueArray. If fails, it provides the results in the failure
  # using raw data. (For now it also includes some data in the ValueArrays even
  # if it fails, which we ignore.)

  def _parseFailureLine(self, line):
    """Handles parsing a line from the failure log."""
    self._tryParseMeasuredFrameRate(line)
    if line.startswith('Raw data: '):
      for result in self._parseRawData(line):
        fmt = result['EncOutputFormat']
        self.size = Size(fmt['width'], fmt['height'])
        self.codec = result['codec']
        self.mime = fmt['mime']

  def _parseMessage(self, message, array):
    """Handles parsing a message from ValueArrays."""
    if message.startswith('codec='):
      result = dict(self._parsePartialResult(m)
                  for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message))
      if 'EncInputFormat' in result:
        self.codec = result['codec']
        fmt = result['EncInputFormat']
        self.size = Size(fmt['width'], fmt['height'])
        self.mime = result['EncOutputFormat']['mime']
        self._rates_from_message.append(1000000./result['min'])


class VideoDecoderPerfTestResult(_VideoResultBase):
  """Represents a result from a VideoDecoderPerfTest performance case."""

  # If a VideoDecoderPerfTest succeeds, it provides the results in the message
  # of a ValueArray. If fails, it provides the results in the failure only
  # using raw data.

  def __init__(self, unused_m):
    super(VideoDecoderPerfTestResult, self).__init__(is_decoder=True)

  def _parseFailureLine(self, line):
    """Handles parsing a line from the failure log."""
    self._tryParseMeasuredFrameRate(line)
    # if the test failed, we can only get the codec/size/mime from the raw data.
    if line.startswith('Raw data: '):
      for result in self._parseRawData(line):
        fmt = result['DecOutputFormat']
        self.size = Size(fmt['width'], fmt['height'])
        self.codec = result['codec']
        self.mime = result['mime']

  def _parseMessage(self, message, array):
    """Handles parsing a message from ValueArrays."""
    if message.startswith('codec='):
      result = dict(self._parsePartialResult(m)
                  for m in re.finditer(self.MESSAGE_PATTERN + '(?: |$)', message))
      if result.get('decodeto') == 'surface':
        self.codec = result['codec']
        fmt = result['DecOutputFormat']
        self.size = Size(fmt['width'], fmt['height'])
        self.mime = result['mime']
        self._rates_from_message.append(1000000. / result['min'])


class Results(object):
  """Container that keeps all test results."""
  def __init__(self):
      self._results = [] # namedtuples
      self._device = None

  VIDEO_ENCODER_DECODER_TEST_REGEX = re.compile(
      'test(.*)(\d{4})x(\d{4})(Goog|Other)$')

  VIDEO_DECODER_PERF_TEST_REGEX = re.compile(
      'test(VP[89]|H26[34]|MPEG4|HEVC)(\d+)x(\d+)(.*)$')

  TestCaseSpec = namedtuple('TestCaseSpec', 'package path class_ regex result_class')

  def _getTestCases(self):
    return [
      self.TestCaseSpec(package='CtsDeviceVideoPerf',
                   path='TestSuite/TestSuite/TestSuite/TestSuite/TestCase',
                   class_='VideoEncoderDecoderTest',
                   regex=self.VIDEO_ENCODER_DECODER_TEST_REGEX,
                   result_class=VideoEncoderDecoderTestResult),
      self.TestCaseSpec(package='CtsMediaTestCases',
                   path='TestSuite/TestSuite/TestSuite/TestCase',
                   class_='VideoDecoderPerfTest',
                   regex=self.VIDEO_DECODER_PERF_TEST_REGEX,
                   result_class=VideoDecoderPerfTestResult)
    ]

  def _verifyDeviceInfo(self, device):
    assert self._device in (None, device), "expected %s device" % self._device
    self._device = device

  def importXml(self, xml):
    self._verifyDeviceInfo(xml.find('DeviceInfo/BuildInfo').get('buildName'))

    packages = createLookup(self._getTestCases(), lambda tc: tc.package)

    for pkg in xml.iter('TestPackage'):
      tests_in_package = packages.get(pkg.get('name'))
      if not tests_in_package:
        continue
      paths = createLookup(tests_in_package, lambda tc: tc.path)
      for path, tests_in_path in paths.items():
        classes = createLookup(tests_in_path, lambda tc: tc.class_)
        for tc in pkg.iterfind(path):
          tests_in_class = classes.get(tc.get('name'))
          if not tests_in_class:
            continue
          for test in tc.iter('Test'):
            for tc in tests_in_class:
              m = tc.regex.match(test.get('name'))
              if m:
                result = tc.result_class(m)
                result.parse(test)
                self._results.append(result)

  def importFile(self, path):
    print >> sys.stderr, 'Importing "%s"...' % path
    try:
      return self.importXml(ET.parse(path))
    except ET.ParseError:
      raise ValueError('not a valid XML file')

  def getData(self):
    for result in self._results:
      for data in result.getData():
        yield data

  def dumpXml(self, results):
    yield '<?xml version="1.0" encoding="utf-8" ?>'
    yield '<!-- Copyright 2015 The Android Open Source Project'
    yield ''
    yield '     Licensed under the Apache License, Version 2.0 (the "License");'
    yield '     you may not use this file except in compliance with the License.'
    yield '     You may obtain a copy of the License at'
    yield ''
    yield '          http://www.apache.org/licenses/LICENSE-2.0'
    yield ''
    yield '     Unless required by applicable law or agreed to in writing, software'
    yield '     distributed under the License is distributed on an "AS IS" BASIS,'
    yield '     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.'
    yield '     See the License for the specific language governing permissions and'
    yield '     limitations under the License.'
    yield '-->'
    yield ''
    yield '<MediaCodecs>'
    last_section = None
    Comp = namedtuple('Comp', 'is_decoder google mime name')
    by_comp = createLookup(results,
                           lambda e: Comp(is_decoder=e.is_decoder, google='.google.' in e.codec, mime=e.mime, name=e.codec))
    for comp in sorted(by_comp):
      section = 'Decoders' if comp.is_decoder else 'Encoders'
      if section != last_section:
        if last_section:
          yield '    </%s>' % last_section
        yield '    <%s>' % section
        last_section = section
      yield '        <MediaCodec name="%s" type="%s" update="true">' % (comp.name, comp.mime)
      by_size = createLookup(by_comp[comp], lambda e: e.size)
      for size in sorted(by_size):
        values = list(itertools.chain(*(e.rates for e in by_size[size])))
        min_, max_ = min(values), max(values)
        med_ = int(math.sqrt(min_ * max_))
        yield '            <Limit name="measured-frame-rate-%s" range="%d-%d" />' % (size, med_, med_)
      yield '        </MediaCodec>'
    if last_section:
      yield '    </%s>' % last_section
    yield '</MediaCodecs>'


class Main(object):
  """Executor of this utility."""

  def __init__(self):
    self._result = Results()

    self._parser = argparse.ArgumentParser('get_achievable_framerates')
    self._parser.add_argument('result_xml', nargs='+')

  def _parseArgs(self):
    self._args = self._parser.parse_args()

  def _importXml(self, xml):
    self._result.importFile(xml)

  def _report(self):
    for line in self._result.dumpXml(r for r in self._result.getData() if r):
      print line

  def run(self):
    self._parseArgs()
    try:
      for xml in self._args.result_xml:
        try:
          self._importXml(xml)
        except (ValueError, IOError, AssertionError) as e:
          print >> sys.stderr, e
          raise KeyboardInterrupt
      self._report()
    except KeyboardInterrupt:
      print >> sys.stderr, 'Interrupted.'

if __name__ == '__main__':
  Main().run()