#!/usr/bin/python
# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
# pylint: disable-msg=W0311
from collections import namedtuple
import argparse
import glob
import json
import os
import pprint
import re
import subprocess
_EXPECTATIONS_DIR = 'expectations'
_AUTOTEST_RESULT_ID_TEMPLATE = 'gs://chromeos-autotest-results/%s-chromeos-test/chromeos*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
#_AUTOTEST_RESULT_TAG_TEMPLATE = 'gs://chromeos-autotest-results/%s/graphics_dEQP/debug/graphics_dEQP.DEBUG'
_AUTOTEST_RESULT_TAG_TEMPLATE = 'gs://chromeos-autotest-results/%s/debug/client.0.DEBUG'
# Use this template for tryjob results:
#_AUTOTEST_RESULT_TEMPLATE = 'gs://chromeos-autotest-results/%s-ihf/*/graphics_dEQP/debug/graphics_dEQP.DEBUG'
_BOARD_REGEX = re.compile(r'ChromeOS BOARD = (.+)')
_CPU_FAMILY_REGEX = re.compile(r'ChromeOS CPU family = (.+)')
_GPU_FAMILY_REGEX = re.compile(r'ChromeOS GPU family = (.+)')
_TEST_FILTER_REGEX = re.compile(r'dEQP test filter = (.+)')
_HASTY_MODE_REGEX = re.compile(r'\'hasty\': \'True\'|Running in hasty mode.')
#04/23 07:30:21.624 INFO |graphics_d:0240| TestCase: dEQP-GLES3.functional.shaders.operator.unary_operator.bitwise_not.highp_ivec3_vertex
#04/23 07:30:21.840 INFO |graphics_d:0261| Result: Pass
_TEST_RESULT_REGEX = re.compile(r'TestCase: (.+?)$\n.+? Result: (.+?)$',
re.MULTILINE)
_HASTY_TEST_RESULT_REGEX = re.compile(
r'\[stdout\] Test case \'(.+?)\'..$\n'
r'.+?\[stdout\] (Pass|NotSupported|QualityWarning|CompatibilityWarning|'
r'Fail|ResourceError|Crash|Timeout|InternalError|Skipped) \((.+)\)', re.MULTILINE)
Logfile = namedtuple('Logfile', 'job_id name gs_path')
def execute(cmd_list):
sproc = subprocess.Popen(cmd_list, stdout=subprocess.PIPE)
return sproc.communicate()[0]
def get_metadata(s):
cpu = re.search(_CPU_FAMILY_REGEX, s).group(1)
gpu = re.search(_GPU_FAMILY_REGEX, s).group(1)
board = re.search(_BOARD_REGEX, s).group(1)
filter = re.search(_TEST_FILTER_REGEX, s).group(1)
hasty = False
if re.search(_HASTY_MODE_REGEX, s):
hasty = True
print('Found results from %s for GPU = %s, filter = %s and hasty = %r.' %
(board, gpu, filter, hasty))
return board, gpu, filter, hasty
def copy_logs_from_gs_path(autotest_result_path):
logs = []
gs_paths = execute(['gsutil', 'ls', autotest_result_path]).splitlines()
for gs_path in gs_paths:
job_id = gs_path.split('/')[3].split('-')[0]
# DEBUG logs have more information than INFO logs, especially for hasty.
name = os.path.join('logs', job_id + '_graphics_dEQP.DEBUG')
logs.append(Logfile(job_id, name, gs_path))
for log in logs:
execute(['gsutil', 'cp', log.gs_path, log.name])
return logs
def get_local_logs():
logs = []
for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.INFO')):
job_id = name.split('_')[0]
logs.append(Logfile(job_id, name, name))
for name in glob.glob(os.path.join('logs', '*_graphics_dEQP.DEBUG')):
job_id = name.split('_')[0]
logs.append(Logfile(job_id, name, name))
return logs
def get_all_tests(text):
tests = []
for test, result in re.findall(_TEST_RESULT_REGEX, text):
tests.append((test, result))
for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
tests.append((test, result))
return tests
def get_not_passing_tests(text):
not_passing = []
for test, result in re.findall(_TEST_RESULT_REGEX, text):
if not (result == 'Pass' or result == 'NotSupported' or result == 'Skipped' or
result == 'QualityWarning' or result == 'CompatibilityWarning'):
not_passing.append((test, result))
for test, result, details in re.findall(_HASTY_TEST_RESULT_REGEX, text):
if result != 'Pass':
not_passing.append((test, result))
return not_passing
def load_expectation_dict(json_file):
data = {}
if os.path.isfile(json_file):
print 'Loading file ' + json_file
with open(json_file, 'r') as f:
text = f.read()
data = json.loads(text)
return data
def load_expectations(json_file):
data = load_expectation_dict(json_file)
expectations = {}
# Convert from dictionary of lists to dictionary of sets.
for key in data:
expectations[key] = set(data[key])
return expectations
def expectation_list_to_dict(tests):
data = {}
tests = list(set(tests))
for test, result in tests:
if data.has_key(result):
new_list = list(set(data[result].append(test)))
data.pop(result)
data[result] = new_list
else:
data[result] = [test]
return data
def save_expectation_dict(expectation_path, expectation_dict):
# Clean up obsolete expectations.
for file_name in glob.glob(expectation_path + '.*'):
if not '.hasty.' in file_name or '.hasty' in expectation_path:
os.remove(file_name)
# Dump json for next iteration.
with open(expectation_path + '.json', 'w') as f:
json.dump(expectation_dict,
f,
sort_keys=True,
indent=4,
separators=(',', ': '))
# Dump plain text for autotest.
for key in expectation_dict:
if expectation_dict[key]:
with open(expectation_path + '.' + key, 'w') as f:
for test in expectation_dict[key]:
f.write(test)
f.write('\n')
# Figure out duplicates and move them to Flaky result set/list.
def process_flaky(status_dict):
"""Figure out duplicates and move them to Flaky result set/list."""
clean_dict = {}
flaky = set([])
if status_dict.has_key('Flaky'):
flaky = status_dict['Flaky']
# FLaky tests are tests with 2 distinct results.
for key1 in status_dict.keys():
for key2 in status_dict.keys():
if key1 != key2:
flaky |= status_dict[key1] & status_dict[key2]
# Remove Flaky tests from other status and convert to dict of list.
for key in status_dict.keys():
if key != 'Flaky':
not_flaky = list(status_dict[key] - flaky)
not_flaky.sort()
print 'Number of "%s" is %d.' % (key, len(not_flaky))
clean_dict[key] = not_flaky
# And finally process flaky list/set.
flaky_list = list(flaky)
flaky_list.sort()
clean_dict['Flaky'] = flaky_list
return clean_dict
def merge_expectation_list(expectation_path, tests):
status_dict = {}
expectation_json = expectation_path + '.json'
if os.access(expectation_json, os.R_OK):
status_dict = load_expectations(expectation_json)
else:
print 'Could not load', expectation_json
for test, result in tests:
if status_dict.has_key(result):
new_set = status_dict[result]
new_set.add(test)
status_dict.pop(result)
status_dict[result] = new_set
else:
status_dict[result] = set([test])
clean_dict = process_flaky(status_dict)
save_expectation_dict(expectation_path, clean_dict)
def load_log(name):
"""Load test log and clean it from stderr spew."""
with open(name) as f:
lines = f.read().splitlines()
text = ''
for line in lines:
if ('dEQP test filter =' in line or 'ChromeOS BOARD = ' in line or
'ChromeOS CPU family =' in line or 'ChromeOS GPU family =' in line or
'TestCase: ' in line or 'Result: ' in line or
'Test Options: ' in line or 'Running in hasty mode.' in line or
# For hasty logs we have:
'Pass (' in line or 'NotSupported (' in line or 'Skipped (' in line or
'QualityWarning (' in line or 'CompatibilityWarning (' in line or
'Fail (' in line or 'ResourceError (' in line or 'Crash (' in line or
'Timeout (' in line or 'InternalError (' in line or
' Test case \'' in line):
text += line + '\n'
# TODO(ihf): Warn about or reject log files missing the end marker.
return text
def all_passing(tests):
for _, result in tests:
if not (result == 'Pass'):
return False
return True
def process_logs(logs):
for log in logs:
text = load_log(log.name)
if text:
print '================================================================'
print 'Loading %s...' % log.name
try:
_, gpu, filter, hasty = get_metadata(text)
tests = get_all_tests(text)
print 'Found %d test results.' % len(tests)
if all_passing(tests):
# Delete logs that don't contain failures.
os.remove(log.name)
else:
# GPU family goes first in path to simplify adding/deleting families.
output_path = os.path.join(_EXPECTATIONS_DIR, gpu)
if not os.access(output_path, os.R_OK):
os.makedirs(output_path)
expectation_path = os.path.join(output_path, filter)
if hasty:
expectation_path = os.path.join(output_path, filter + '.hasty')
merge_expectation_list(expectation_path, tests)
except:
print 'Error processing %s' % log.name
JOB_TAGS_ALL = (
'select distinct job_tag from chromeos_autotest_db.tko_test_view_2 '
'where not job_tag like "%%hostless" and '
'test_name LIKE "graphics_dEQP%%" and '
'build_version>="%s" and '
'build_version<="%s" and '
'((status = "FAIL" and not job_name like "%%.NotPass") or '
'job_name like "%%.functional" or '
'job_name like "%%-master")' )
JOB_TAGS_MASTER = (
'select distinct job_tag from chromeos_autotest_db.tko_test_view_2 '
'where not job_tag like "%%hostless" and '
'test_name LIKE "graphics_dEQP%%" and '
'build_version>="%s" and '
'build_version<="%s" and '
'job_name like "%%-master"' )
def get_result_paths_from_autotest_db(host, user, password, build_from,
build_to):
paths = []
# TODO(ihf): Introduce flag to toggle between JOB_TAGS_ALL and _MASTER.
sql = JOB_TAGS_MASTER % (build_from, build_to)
cmd = ['mysql', '-u%s' % user, '-p%s' % password, '--host', host, '-e', sql]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE)
for line in p.communicate()[0].splitlines():
# Skip over unrelated sql spew (really first line only):
if line and 'chromeos-test' in line:
paths.append(_AUTOTEST_RESULT_TAG_TEMPLATE % line.rstrip())
print 'Found %d potential results in the database.' % len(paths)
return paths
def copy_logs_from_gs_paths(paths):
i = 1
for gs_path in paths:
print '[%d/%d] %s' % (i, len(paths), gs_path)
copy_logs_from_gs_path(gs_path)
i = i+1
argparser = argparse.ArgumentParser(
description='Download from GS and process dEQP logs into expectations.')
argparser.add_argument(
'--host',
dest='host',
default='173.194.81.83',
help='Host containing autotest result DB.')
argparser.add_argument('--user', dest='user', help='Database user account.')
argparser.add_argument(
'--password',
dest='password',
help='Password for user account.')
argparser.add_argument(
'--from',
dest='build_from',
help='Lowest build revision to include. Example: R51-8100.0.0')
argparser.add_argument(
'--to',
dest='build_to',
help='Highest build revision to include. Example: R51-8101.0.0')
args = argparser.parse_args()
print pprint.pformat(args)
# This is somewhat optional. Remove existing expectations to start clean, but
# feel free to process them incrementally.
execute(['rm', '-rf', _EXPECTATIONS_DIR])
copy_logs_from_gs_paths(get_result_paths_from_autotest_db(
args.host, args.user, args.password, args.build_from, args.build_to))
# This will include the just downloaded logs from GS as well.
logs = get_local_logs()
process_logs(logs)