#!/usr/bin/python
import hashlib
import optparse
import os
import re
import shlex
import subprocess
import sys
import threading
import time
TASK_COMPILATION = 'compile'
TASK_DISABLE_OVERLAYS = 'disable overlays'
TASK_ENABLE_MULTIPLE_OVERLAYS = 'enable multiple overlays'
TASK_ENABLE_SINGLE_OVERLAY = 'enable single overlay'
TASK_FILE_EXISTS_TEST = 'test (file exists)'
TASK_GREP_IDMAP_TEST = 'test (grep idmap)'
TASK_MD5_TEST = 'test (md5)'
TASK_IDMAP_PATH = 'idmap --path'
TASK_IDMAP_SCAN = 'idmap --scan'
TASK_INSTRUMENTATION = 'instrumentation'
TASK_INSTRUMENTATION_TEST = 'test (instrumentation)'
TASK_MKDIR = 'mkdir'
TASK_PUSH = 'push'
TASK_ROOT = 'root'
TASK_REMOUNT = 'remount'
TASK_RM = 'rm'
TASK_SETUP_IDMAP_PATH = 'setup idmap --path'
TASK_SETUP_IDMAP_SCAN = 'setup idmap --scan'
TASK_START = 'start'
TASK_STOP = 'stop'
adb = 'adb'
def _adb_shell(cmd):
argv = shlex.split(adb + " shell '" + cmd + "; echo $?'")
proc = subprocess.Popen(argv, bufsize=1, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
(stdout, stderr) = (stdout.replace('\r', ''), stderr.replace('\r', ''))
tmp = stdout.rsplit('\n', 2)
if len(tmp) == 2:
stdout == ''
returncode = int(tmp[0])
else:
stdout = tmp[0] + '\n'
returncode = int(tmp[1])
return returncode, stdout, stderr
class VerbosePrinter:
class Ticker(threading.Thread):
def _print(self):
s = '\r' + self.text + '[' + '.' * self.i + ' ' * (4 - self.i) + ']'
sys.stdout.write(s)
sys.stdout.flush()
self.i = (self.i + 1) % 5
def __init__(self, cond_var, text):
threading.Thread.__init__(self)
self.text = text
self.setDaemon(True)
self.cond_var = cond_var
self.running = False
self.i = 0
self._print()
self.running = True
def run(self):
self.cond_var.acquire()
while True:
self.cond_var.wait(0.25)
running = self.running
if not running:
break
self._print()
self.cond_var.release()
def stop(self):
self.cond_var.acquire()
self.running = False
self.cond_var.notify_all()
self.cond_var.release()
def _start_ticker(self):
self.ticker = VerbosePrinter.Ticker(self.cond_var, self.text)
self.ticker.start()
def _stop_ticker(self):
self.ticker.stop()
self.ticker.join()
self.ticker = None
def _format_begin(self, type, name):
N = self.width - len(type) - len(' [ ] ')
fmt = '%%s %%-%ds ' % N
return fmt % (type, name)
def __init__(self, use_color):
self.cond_var = threading.Condition()
self.ticker = None
if use_color:
self.color_RED = '\033[1;31m'
self.color_red = '\033[0;31m'
self.color_reset = '\033[0;37m'
else:
self.color_RED = ''
self.color_red = ''
self.color_reset = ''
argv = shlex.split('stty size') # get terminal width
proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode == 0:
(h, w) = stdout.split()
self.width = int(w)
else:
self.width = 72 # conservative guesstimate
def begin(self, type, name):
self.text = self._format_begin(type, name)
sys.stdout.write(self.text + '[ ]')
sys.stdout.flush()
self._start_ticker()
def end_pass(self, type, name):
self._stop_ticker()
sys.stdout.write('\r' + self.text + '[ OK ]\n')
sys.stdout.flush()
def end_fail(self, type, name, msg):
self._stop_ticker()
sys.stdout.write('\r' + self.color_RED + self.text + '[FAIL]\n')
sys.stdout.write(self.color_red)
sys.stdout.write(msg)
sys.stdout.write(self.color_reset)
sys.stdout.flush()
class QuietPrinter:
def begin(self, type, name):
pass
def end_pass(self, type, name):
sys.stdout.write('PASS ' + type + ' ' + name + '\n')
sys.stdout.flush()
def end_fail(self, type, name, msg):
sys.stdout.write('FAIL ' + type + ' ' + name + '\n')
sys.stdout.flush()
class CompilationTask:
def __init__(self, makefile):
self.makefile = makefile
def get_type(self):
return TASK_COMPILATION
def get_name(self):
return self.makefile
def execute(self):
os.putenv('ONE_SHOT_MAKEFILE', os.getcwd() + "/" + self.makefile)
argv = shlex.split('make -C "../../../../../" files')
proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
return proc.returncode, stdout, stderr
class InstrumentationTask:
def __init__(self, instrumentation_class):
self.instrumentation_class = instrumentation_class
def get_type(self):
return TASK_INSTRUMENTATION
def get_name(self):
return self.instrumentation_class
def execute(self):
return _adb_shell('am instrument -r -w -e class %s com.android.overlaytest/android.test.InstrumentationTestRunner' % self.instrumentation_class)
class PushTask:
def __init__(self, src, dest):
self.src = src
self.dest = dest
def get_type(self):
return TASK_PUSH
def get_name(self):
return "%s -> %s" % (self.src, self.dest)
def execute(self):
src = os.getenv('OUT') + "/" + self.src
argv = shlex.split(adb + ' push %s %s' % (src, self.dest))
proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
return proc.returncode, stdout, stderr
class MkdirTask:
def __init__(self, path):
self.path = path
def get_type(self):
return TASK_MKDIR
def get_name(self):
return self.path
def execute(self):
return _adb_shell('mkdir -p %s' % self.path)
class RmTask:
def __init__(self, path):
self.path = path
def get_type(self):
return TASK_RM
def get_name(self):
return self.path
def execute(self):
returncode, stdout, stderr = _adb_shell('ls %s' % self.path)
if returncode != 0 and stdout.endswith(': No such file or directory\n'):
return 0, "", ""
return _adb_shell('rm -r %s' % self.path)
class IdmapPathTask:
def __init__(self, path_target_apk, path_overlay_apk, path_idmap):
self.path_target_apk = path_target_apk
self.path_overlay_apk = path_overlay_apk
self.path_idmap = path_idmap
def get_type(self):
return TASK_IDMAP_PATH
def get_name(self):
return self.path_idmap
def execute(self):
return _adb_shell('su system idmap --path "%s" "%s" "%s"' % (self.path_target_apk, self.path_overlay_apk, self.path_idmap))
class IdmapScanTask:
def __init__(self, overlay_dir, target_pkg_name, target_pkg, idmap_dir, symlink_dir):
self.overlay_dir = overlay_dir
self.target_pkg_name = target_pkg_name
self.target_pkg = target_pkg
self.idmap_dir = idmap_dir
self.symlink_dir = symlink_dir
def get_type(self):
return TASK_IDMAP_SCAN
def get_name(self):
return self.target_pkg_name
def execute(self):
return _adb_shell('su system idmap --scan "%s" "%s" "%s" "%s"' % (self.overlay_dir, self.target_pkg_name, self.target_pkg, self.idmap_dir))
class FileExistsTest:
def __init__(self, path):
self.path = path
def get_type(self):
return TASK_FILE_EXISTS_TEST
def get_name(self):
return self.path
def execute(self):
return _adb_shell('ls %s' % self.path)
class GrepIdmapTest:
def __init__(self, path_idmap, pattern, expected_n):
self.path_idmap = path_idmap
self.pattern = pattern
self.expected_n = expected_n
def get_type(self):
return TASK_GREP_IDMAP_TEST
def get_name(self):
return self.pattern
def execute(self):
returncode, stdout, stderr = _adb_shell('idmap --inspect %s' % self.path_idmap)
if returncode != 0:
return returncode, stdout, stderr
all_matches = re.findall('\s' + self.pattern + '$', stdout, flags=re.MULTILINE)
if len(all_matches) != self.expected_n:
return 1, 'pattern=%s idmap=%s expected=%d found=%d\n' % (self.pattern, self.path_idmap, self.expected_n, len(all_matches)), ''
return 0, "", ""
class Md5Test:
def __init__(self, path, expected_content):
self.path = path
self.expected_md5 = hashlib.md5(expected_content).hexdigest()
def get_type(self):
return TASK_MD5_TEST
def get_name(self):
return self.path
def execute(self):
returncode, stdout, stderr = _adb_shell('md5sum %s' % self.path)
if returncode != 0:
return returncode, stdout, stderr
actual_md5 = stdout.split()[0]
if actual_md5 != self.expected_md5:
return 1, 'expected %s, got %s\n' % (self.expected_md5, actual_md5), ''
return 0, "", ""
class StartTask:
def get_type(self):
return TASK_START
def get_name(self):
return ""
def execute(self):
(returncode, stdout, stderr) = _adb_shell('start')
if returncode != 0:
return returncode, stdout, stderr
while True:
(returncode, stdout, stderr) = _adb_shell('getprop dev.bootcomplete')
if returncode != 0:
return returncode, stdout, stderr
if stdout.strip() == "1":
break
time.sleep(0.5)
return 0, "", ""
class StopTask:
def get_type(self):
return TASK_STOP
def get_name(self):
return ""
def execute(self):
(returncode, stdout, stderr) = _adb_shell('stop')
if returncode != 0:
return returncode, stdout, stderr
return _adb_shell('setprop dev.bootcomplete 0')
class RootTask:
def get_type(self):
return TASK_ROOT
def get_name(self):
return ""
def execute(self):
(returncode, stdout, stderr) = _adb_shell('getprop service.adb.root 0')
if returncode != 0:
return returncode, stdout, stderr
if stdout.strip() == '1': # already root
return 0, "", ""
argv = shlex.split(adb + ' root')
proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
if proc.returncode != 0:
return proc.returncode, stdout, stderr
argv = shlex.split(adb + ' wait-for-device')
proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
return proc.returncode, stdout, stderr
class RemountTask:
def get_type(self):
return TASK_REMOUNT
def get_name(self):
return ""
def execute(self):
argv = shlex.split(adb + ' remount')
proc = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = proc.communicate()
# adb remount returns 0 even if the operation failed, so check stdout
if stdout.startswith('remount failed:'):
return 1, stdout, stderr
return proc.returncode, stdout, stderr
class CompoundTask:
def __init__(self, type, tasks):
self.type = type
self.tasks = tasks
def get_type(self):
return self.type
def get_name(self):
return ""
def execute(self):
for t in self.tasks:
(returncode, stdout, stderr) = t.execute()
if returncode != 0:
return returncode, stdout, stderr
return 0, "", ""
def _create_disable_overlays_task():
tasks = [
RmTask("/vendor/overlay/framework_a.apk"),
RmTask("/vendor/overlay/framework_b.apk"),
RmTask("/data/resource-cache/vendor@overlay@framework_a.apk@idmap"),
RmTask("/data/resource-cache/vendor@overlay@framework_b.apk@idmap"),
RmTask("/vendor/overlay/app_a.apk"),
RmTask("/vendor/overlay/app_b.apk"),
RmTask("/data/resource-cache/vendor@overlay@app_a.apk@idmap"),
RmTask("/data/resource-cache/vendor@overlay@app_b.apk@idmap"),
]
return CompoundTask(TASK_DISABLE_OVERLAYS, tasks)
def _create_enable_single_overlay_task():
tasks = [
_create_disable_overlays_task(),
MkdirTask('/system/vendor'),
MkdirTask('/vendor/overlay'),
PushTask('/data/app/com.android.overlaytest.overlay/com.android.overlaytest.overlay.apk', '/vendor/overlay/framework_a.apk'),
PushTask('/data/app/com.android.overlaytest.first_app_overlay/com.android.overlaytest.first_app_overlay.apk', '/vendor/overlay/app_a.apk'),
]
return CompoundTask(TASK_ENABLE_SINGLE_OVERLAY, tasks)
def _create_enable_multiple_overlays_task():
tasks = [
_create_disable_overlays_task(),
MkdirTask('/system/vendor'),
MkdirTask('/vendor/overlay'),
PushTask('/data/app/com.android.overlaytest.overlay/com.android.overlaytest.overlay.apk', '/vendor/overlay/framework_b.apk'),
PushTask('/data/app/com.android.overlaytest.first_app_overlay/com.android.overlaytest.first_app_overlay.apk', '/vendor/overlay/app_a.apk'),
PushTask('/data/app/com.android.overlaytest.second_app_overlay/com.android.overlaytest.second_app_overlay.apk', '/vendor/overlay/app_b.apk'),
]
return CompoundTask(TASK_ENABLE_MULTIPLE_OVERLAYS, tasks)
def _create_setup_idmap_path_task(idmaps, symlinks):
tasks = [
_create_enable_single_overlay_task(),
RmTask(symlinks),
RmTask(idmaps),
MkdirTask(idmaps),
MkdirTask(symlinks),
]
return CompoundTask(TASK_SETUP_IDMAP_PATH, tasks)
def _create_setup_idmap_scan_task(idmaps, symlinks):
tasks = [
_create_enable_single_overlay_task(),
RmTask(symlinks),
RmTask(idmaps),
MkdirTask(idmaps),
MkdirTask(symlinks),
_create_enable_multiple_overlays_task(),
]
return CompoundTask(TASK_SETUP_IDMAP_SCAN, tasks)
def _handle_instrumentation_task_output(stdout, printer):
regex_status_code = re.compile(r'^INSTRUMENTATION_STATUS_CODE: -?(\d+)')
regex_name = re.compile(r'^INSTRUMENTATION_STATUS: test=(.*)')
regex_begin_stack = re.compile(r'^INSTRUMENTATION_STATUS: stack=(.*)')
regex_end_stack = re.compile(r'^$')
failed_tests = 0
current_test = None
current_stack = []
mode_stack = False
for line in stdout.split("\n"):
line = line.rstrip() # strip \r from adb output
m = regex_status_code.match(line)
if m:
c = int(m.group(1))
if c == 1:
printer.begin(TASK_INSTRUMENTATION_TEST, current_test)
elif c == 0:
printer.end_pass(TASK_INSTRUMENTATION_TEST, current_test)
else:
failed_tests += 1
current_stack.append("\n")
msg = "\n".join(current_stack)
printer.end_fail(TASK_INSTRUMENTATION_TEST, current_test, msg.rstrip() + '\n')
continue
m = regex_name.match(line)
if m:
current_test = m.group(1)
continue
m = regex_begin_stack.match(line)
if m:
mode_stack = True
current_stack = []
current_stack.append(" " + m.group(1))
continue
m = regex_end_stack.match(line)
if m:
mode_stack = False
continue
if mode_stack:
current_stack.append(" " + line.strip())
return failed_tests
def _set_adb_device(option, opt, value, parser):
global adb
if opt == '-d' or opt == '--device':
adb = 'adb -d'
if opt == '-e' or opt == '--emulator':
adb = 'adb -e'
if opt == '-s' or opt == '--serial':
adb = 'adb -s ' + value
def _create_opt_parser():
parser = optparse.OptionParser()
parser.add_option('-d', '--device', action='callback', callback=_set_adb_device,
help='pass -d to adb')
parser.add_option('-e', '--emulator', action='callback', callback=_set_adb_device,
help='pass -e to adb')
parser.add_option('-s', '--serial', type="str", action='callback', callback=_set_adb_device,
help='pass -s <serical> to adb')
parser.add_option('-C', '--no-color', action='store_false',
dest='use_color', default=True,
help='disable color escape sequences in output')
parser.add_option('-q', '--quiet', action='store_true',
dest='quiet_mode', default=False,
help='quiet mode, output only results')
parser.add_option('-b', '--no-build', action='store_false',
dest='do_build', default=True,
help='do not rebuild test projects')
parser.add_option('-k', '--continue', action='store_true',
dest='do_continue', default=False,
help='do not rebuild test projects')
parser.add_option('-i', '--test-idmap', action='store_true',
dest='test_idmap', default=False,
help='run tests for single overlay')
parser.add_option('-0', '--test-no-overlay', action='store_true',
dest='test_no_overlay', default=False,
help='run tests without any overlay')
parser.add_option('-1', '--test-single-overlay', action='store_true',
dest='test_single_overlay', default=False,
help='run tests for single overlay')
parser.add_option('-2', '--test-multiple-overlays', action='store_true',
dest='test_multiple_overlays', default=False,
help='run tests for multiple overlays')
return parser
if __name__ == '__main__':
opt_parser = _create_opt_parser()
opts, args = opt_parser.parse_args(sys.argv[1:])
if not opts.test_idmap and not opts.test_no_overlay and not opts.test_single_overlay and not opts.test_multiple_overlays:
opts.test_idmap = True
opts.test_no_overlay = True
opts.test_single_overlay = True
opts.test_multiple_overlays = True
if len(args) > 0:
opt_parser.error("unexpected arguments: %s" % " ".join(args))
# will never reach this: opt_parser.error will call sys.exit
if opts.quiet_mode:
printer = QuietPrinter()
else:
printer = VerbosePrinter(opts.use_color)
tasks = []
# must be in the same directory as this script for compilation tasks to work
script = sys.argv[0]
dirname = os.path.dirname(script)
wd = os.path.realpath(dirname)
os.chdir(wd)
# build test cases
if opts.do_build:
tasks.append(CompilationTask('OverlayTest/Android.mk'))
tasks.append(CompilationTask('OverlayTestOverlay/Android.mk'))
tasks.append(CompilationTask('OverlayAppFirst/Android.mk'))
tasks.append(CompilationTask('OverlayAppSecond/Android.mk'))
# remount filesystem, install test project
tasks.append(RootTask())
tasks.append(RemountTask())
tasks.append(PushTask('/system/app/OverlayTest/OverlayTest.apk', '/system/app/OverlayTest.apk'))
# test idmap
if opts.test_idmap:
idmaps='/data/local/tmp/idmaps'
symlinks='/data/local/tmp/symlinks'
# idmap --path
tasks.append(StopTask())
tasks.append(_create_setup_idmap_path_task(idmaps, symlinks))
tasks.append(StartTask())
tasks.append(IdmapPathTask('/vendor/overlay/framework_a.apk', '/system/framework/framework-res.apk', idmaps + '/a.idmap'))
tasks.append(FileExistsTest(idmaps + '/a.idmap'))
tasks.append(GrepIdmapTest(idmaps + '/a.idmap', 'bool/config_annoy_dianne', 1))
# idmap --scan
idmap = idmaps + '/vendor@overlay@framework_b.apk@idmap'
tasks.append(StopTask())
tasks.append(_create_setup_idmap_scan_task(idmaps, symlinks))
tasks.append(StartTask())
tasks.append(IdmapScanTask('/vendor/overlay', 'android', '/system/framework/framework-res.apk', idmaps, symlinks))
tasks.append(FileExistsTest(idmap))
tasks.append(GrepIdmapTest(idmap, 'bool/config_annoy_dianne', 1))
# overlays.list
overlays_list_path = idmaps + '/overlays.list'
expected_content = '''\
/vendor/overlay/framework_b.apk /data/local/tmp/idmaps/vendor@overlay@framework_b.apk@idmap
'''
tasks.append(FileExistsTest(overlays_list_path))
tasks.append(Md5Test(overlays_list_path, expected_content))
# idmap cleanup
tasks.append(RmTask(symlinks))
tasks.append(RmTask(idmaps))
# test no overlay
if opts.test_no_overlay:
tasks.append(StopTask())
tasks.append(_create_disable_overlays_task())
tasks.append(StartTask())
tasks.append(InstrumentationTask('com.android.overlaytest.WithoutOverlayTest'))
# test single overlay
if opts.test_single_overlay:
tasks.append(StopTask())
tasks.append(_create_enable_single_overlay_task())
tasks.append(StartTask())
tasks.append(InstrumentationTask('com.android.overlaytest.WithOverlayTest'))
# test multiple overlays
if opts.test_multiple_overlays:
tasks.append(StopTask())
tasks.append(_create_enable_multiple_overlays_task())
tasks.append(StartTask())
tasks.append(InstrumentationTask('com.android.overlaytest.WithMultipleOverlaysTest'))
ignored_errors = 0
for t in tasks:
type = t.get_type()
name = t.get_name()
if type == TASK_INSTRUMENTATION:
# InstrumentationTask will run several tests, but we want it
# to appear as if each test was run individually. Calling
# "am instrument" with a single test method is prohibitively
# expensive, so let's instead post-process the output to
# emulate individual calls.
retcode, stdout, stderr = t.execute()
if retcode != 0:
printer.begin(TASK_INSTRUMENTATION, name)
printer.end_fail(TASK_INSTRUMENTATION, name, stderr)
sys.exit(retcode)
retcode = _handle_instrumentation_task_output(stdout, printer)
if retcode != 0:
if not opts.do_continue:
sys.exit(retcode)
else:
ignored_errors += retcode
else:
printer.begin(type, name)
retcode, stdout, stderr = t.execute()
if retcode == 0:
printer.end_pass(type, name)
if retcode != 0:
if len(stderr) == 0:
# hope for output from stdout instead (true for eg adb shell rm)
stderr = stdout
printer.end_fail(type, name, stderr)
if not opts.do_continue:
sys.exit(retcode)
else:
ignored_errors += retcode
sys.exit(ignored_errors)