普通文本  |  334行  |  11.26 KB

#!/usr/bin/python
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# pylint: disable-msg=W0311

from collections import namedtuple
import argparse
import glob
import json
import os
import pprint
import re
import subprocess

_EXPECTATIONS_DIR = 'expectations'
_AUTOTEST_RESULT_ID_TEMPLATE = 'gs://chromeos-autotest-results/%s-chromeos-test/chromeos*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
#_AUTOTEST_RESULT_TAG_TEMPLATE = 'gs://chromeos-autotest-results/%s/graphics_dEQP/debug/graphics_dEQP.DEBUG'
_AUTOTEST_RESULT_TAG_TEMPLATE = 'gs://chromeos-autotest-results/%s/debug/client.0.DEBUG'
# Use this template for tryjob results:
#_AUTOTEST_RESULT_TEMPLATE = 'gs://chromeos-autotest-results/%s-ihf/*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
_BOARD_REGEX = re.compile(r'ChromeOS BOARD = (.+)')
_CPU_FAMILY_REGEX = re.compile(r'ChromeOS CPU family = (.+)')
_GPU_FAMILY_REGEX = re.compile(r'ChromeOS GPU family = (.+)')
_TEST_FILTER_REGEX = re.compile(r'dEQP test filter = (.+)')
_HASTY_MODE_REGEX = re.compile(r'\'hasty\': \'True\'|Running in hasty mode.')

#04/23 07:30:21.624 INFO |graphics_d:0240| TestCase: dEQP-GLES3.functional.shaders.operator.unary_operator.bitwise_not.highp_ivec3_vertex
#04/23 07:30:21.840 INFO |graphics_d:0261| Result: Pass
_TEST_RESULT_REGEX = re.compile(r'TestCase: (.+?)$\n.+? Result: (.+?)$',
                                re.MULTILINE)
_HASTY_TEST_RESULT_REGEX = re.compile(
    r'\[stdout\] Test case \'(.+?)\'..$\n'
    r'.+?\[stdout\]   (Pass|NotSupported|QualityWarning|CompatibilityWarning|'
    r'Fail|ResourceError|Crash|Timeout|InternalError|Skipped) \((.+)\)', re.MULTILINE)
Logfile = namedtuple('Logfile', 'job_id name gs_path')


def execute(cmd_list):
  sproc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
  return sproc.communicate()[0]


def get_metadata(s):
  cpu = re.search(_CPU_FAMILY_REGEX, s).group(1)
  gpu = re.search(_GPU_FAMILY_REGEX, s).group(1)
  board = re.search(_BOARD_REGEX, s).group(1)
  filter = re.search(_TEST_FILTER_REGEX, s).group(1)
  hasty = False
  if re.search(_HASTY_MODE_REGEX, s):
    hasty = True
  print('Found results from %s for GPU = %s, filter = %s and hasty = %r.' %
        (board, gpu, filter, hasty))
  return board, gpu, filter, hasty


def copy_logs_from_gs_path(autotest_result_path):
  logs = []
  gs_paths = execute(['gsutil', 'ls', autotest_result_path]).splitlines()
  for gs_path in gs_paths:
    job_id = gs_path.split('/')[3].split('-')[0]
    # DEBUG logs have more information than INFO logs, especially for hasty.
    name = os.path.join('logs', job_id + '_graphics_dEQP.DEBUG')
    logs.append(Logfile(job_id, name, gs_path))
  for log in logs:
    execute(['gsutil', 'cp', log.gs_path, log.name])
  return logs


def get_local_logs():
  logs = []
  for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.INFO')):
    job_id = name.split('_')[0]
    logs.append(Logfile(job_id, name, name))
  for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.DEBUG')):
    job_id = name.split('_')[0]
    logs.append(Logfile(job_id, name, name))
  return logs


def get_all_tests(text):
  tests = []
  for test, result in re.findall(_TEST_RESULT_REGEX, text):
    tests.append((test, result))
  for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
    tests.append((test, result))
  return tests


def get_not_passing_tests(text):
  not_passing = []
  for test, result in re.findall(_TEST_RESULT_REGEX, text):
    if not (result == 'Pass' or result == 'NotSupported' or result == 'Skipped' or
            result == 'QualityWarning' or result == 'CompatibilityWarning'):
      not_passing.append((test, result))
  for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
    if result != 'Pass':
      not_passing.append((test, result))
  return not_passing


def load_expectation_dict(json_file):
  data = {}
  if os.path.isfile(json_file):
    print 'Loading file ' + json_file
    with open(json_file, 'r') as f:
      text = f.read()
      data = json.loads(text)
  return data


def load_expectations(json_file):
  data = load_expectation_dict(json_file)
  expectations = {}
  # Convert from dictionary of lists to dictionary of sets.
  for key in data:
    expectations[key] = set(data[key])
  return expectations


def expectation_list_to_dict(tests):
  data = {}
  tests = list(set(tests))
  for test, result in tests:
    if data.has_key(result):
      new_list = list(set(data[result].append(test)))
      data.pop(result)
      data[result] = new_list
    else:
      data[result] = [test]
  return data


def save_expectation_dict(expectation_path, expectation_dict):
  # Clean up obsolete expectations.
  for file_name in glob.glob(expectation_path + '.*'):
    if not '.hasty.' in file_name or '.hasty' in expectation_path:
      os.remove(file_name)
  # Dump json for next iteration.
  with open(expectation_path + '.json', 'w') as f:
    json.dump(expectation_dict,
              f,
              sort_keys=True,
              indent=4,
              separators=(',', ': '))
  # Dump plain text for autotest.
  for key in expectation_dict:
    if expectation_dict[key]:
      with open(expectation_path + '.' + key, 'w') as f:
        for test in expectation_dict[key]:
          f.write(test)
          f.write('\n')


# Figure out duplicates and move them to Flaky result set/list.
def process_flaky(status_dict):
  """Figure out duplicates and move them to Flaky result set/list."""
  clean_dict = {}
  flaky = set([])
  if status_dict.has_key('Flaky'):
    flaky = status_dict['Flaky']

  # FLaky tests are tests with 2 distinct results.
  for key1 in status_dict.keys():
    for key2 in status_dict.keys():
      if key1 != key2:
        flaky |= status_dict[key1] & status_dict[key2]

  # Remove Flaky tests from other status and convert to dict of list.
  for key in status_dict.keys():
    if key != 'Flaky':
      not_flaky = list(status_dict[key] - flaky)
      not_flaky.sort()
      print 'Number of "%s" is %d.' % (key, len(not_flaky))
      clean_dict[key] = not_flaky

  # And finally process flaky list/set.
  flaky_list = list(flaky)
  flaky_list.sort()
  clean_dict['Flaky'] = flaky_list

  return clean_dict


def merge_expectation_list(expectation_path, tests):
  status_dict = {}
  expectation_json = expectation_path + '.json'
  if os.access(expectation_json, os.R_OK):
    status_dict = load_expectations(expectation_json)
  else:
    print 'Could not load', expectation_json
  for test, result in tests:
    if status_dict.has_key(result):
      new_set = status_dict[result]
      new_set.add(test)
      status_dict.pop(result)
      status_dict[result] = new_set
    else:
      status_dict[result] = set([test])
  clean_dict = process_flaky(status_dict)
  save_expectation_dict(expectation_path, clean_dict)


def load_log(name):
  """Load test log and clean it from stderr spew."""
  with open(name) as f:
    lines = f.read().splitlines()
  text = ''
  for line in lines:
    if ('dEQP test filter =' in line or 'ChromeOS BOARD = ' in line or
        'ChromeOS CPU family =' in line or 'ChromeOS GPU family =' in line or
        'TestCase: ' in line or 'Result: ' in line or
        'Test Options: ' in line or 'Running in hasty mode.' in line or
        # For hasty logs we have:
        'Pass (' in line or 'NotSupported (' in line or 'Skipped (' in line or
        'QualityWarning (' in line or 'CompatibilityWarning (' in line or
        'Fail (' in line or 'ResourceError (' in line or 'Crash (' in line or
        'Timeout (' in line or 'InternalError (' in line or
        ' Test case \'' in line):
      text += line + '\n'
  # TODO(ihf): Warn about or reject log files missing the end marker.
  return text


def all_passing(tests):
  for _, result in tests:
    if not (result == 'Pass'):
      return False
  return True


def process_logs(logs):
  for log in logs:
    text = load_log(log.name)
    if text:
      print '================================================================'
      print 'Loading %s...' % log.name
      try:
        _, gpu, filter, hasty = get_metadata(text)
        tests = get_all_tests(text)
        print 'Found %d test results.' % len(tests)
        if all_passing(tests):
          # Delete logs that don't contain failures.
          os.remove(log.name)
        else:
          # GPU family goes first in path to simplify adding/deleting families.
          output_path = os.path.join(_EXPECTATIONS_DIR, gpu)
          if not os.access(output_path, os.R_OK):
            os.makedirs(output_path)
          expectation_path = os.path.join(output_path, filter)
          if hasty:
            expectation_path = os.path.join(output_path, filter + '.hasty')
          merge_expectation_list(expectation_path, tests)
      except:
        print 'Error processing %s' % log.name


JOB_TAGS_ALL = (
'select distinct job_tag from chromeos_autotest_db.tko_test_view_2 '
'where not job_tag like "%%hostless" and '
'test_name LIKE "graphics_dEQP%%" and '
'build_version>="%s" and '
'build_version<="%s" and '
'((status = "FAIL" and not job_name like "%%.NotPass") or '
'job_name like "%%.functional" or '
'job_name like "%%-master")' )

JOB_TAGS_MASTER = (
'select distinct job_tag from chromeos_autotest_db.tko_test_view_2 '
'where not job_tag like "%%hostless" and '
'test_name LIKE "graphics_dEQP%%" and '
'build_version>="%s" and '
'build_version<="%s" and '
'job_name like "%%-master"' )

def get_result_paths_from_autotest_db(host, user, password, build_from,
                                      build_to):
  paths = []
  # TODO(ihf): Introduce flag to toggle between JOB_TAGS_ALL and _MASTER.
  sql = JOB_TAGS_MASTER % (build_from, build_to)
  cmd = ['mysql', '-u%s' % user, '-p%s' % password, '--host', host, '-e', sql]
  p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
  for line in p.communicate()[0].splitlines():
    # Skip over unrelated sql spew (really first line only):
    if line and 'chromeos-test' in line:
      paths.append(_AUTOTEST_RESULT_TAG_TEMPLATE % line.rstrip())
  print 'Found %d potential results in the database.' % len(paths)
  return paths


def copy_logs_from_gs_paths(paths):
  i = 1
  for gs_path in paths:
    print '[%d/%d] %s' % (i, len(paths), gs_path)
    copy_logs_from_gs_path(gs_path)
    i = i+1


argparser = argparse.ArgumentParser(
    description='Download from GS and process dEQP logs into expectations.')
argparser.add_argument(
    '--host',
    dest='host',
    default='173.194.81.83',
    help='Host containing autotest result DB.')
argparser.add_argument('--user', dest='user', help='Database user account.')
argparser.add_argument(
    '--password',
    dest='password',
    help='Password for user account.')
argparser.add_argument(
    '--from',
    dest='build_from',
    help='Lowest build revision to include. Example: R51-8100.0.0')
argparser.add_argument(
    '--to',
    dest='build_to',
    help='Highest build revision to include. Example: R51-8101.0.0')

args = argparser.parse_args()

print pprint.pformat(args)
# This is somewhat optional. Remove existing expectations to start clean, but
# feel free to process them incrementally.
execute(['rm', '-rf', _EXPECTATIONS_DIR])

copy_logs_from_gs_paths(get_result_paths_from_autotest_db(
    args.host, args.user, args.password, args.build_from, args.build_to))

# This will include the just downloaded logs from GS as well.
logs = get_local_logs()
process_logs(logs)