#
# Copyright (C) 2016 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import base64
import concurrent.futures
import datetime
import functools
import json
import logging
import os
import random
import re
import signal
import string
import subprocess
import time
import traceback
# File name length is limited to 255 chars on some OS, so we need to make sure
# the file names we output fits within the limit.
MAX_FILENAME_LEN = 255
# Path length is limited to 4096 chars on some OS, so we need to make sure
# the path we output fits within the limit.
MAX_PATH_LEN = 4096
class VTSUtilsError(Exception):
"""Generic error raised for exceptions in ACTS utils."""
class NexusModelNames:
# TODO(angli): This will be fixed later by angli.
ONE = 'sprout'
N5 = 'hammerhead'
N5v2 = 'bullhead'
N6 = 'shamu'
N6v2 = 'angler'
ascii_letters_and_digits = string.ascii_letters + string.digits
valid_filename_chars = "-_." + ascii_letters_and_digits
models = ("sprout", "occam", "hammerhead", "bullhead", "razor", "razorg",
"shamu", "angler", "volantis", "volantisg", "mantaray", "fugu",
"ryu")
manufacture_name_to_model = {
"flo": "razor",
"flo_lte": "razorg",
"flounder": "volantis",
"flounder_lte": "volantisg",
"dragon": "ryu"
}
GMT_to_olson = {
"GMT-9": "America/Anchorage",
"GMT-8": "US/Pacific",
"GMT-7": "US/Mountain",
"GMT-6": "US/Central",
"GMT-5": "US/Eastern",
"GMT-4": "America/Barbados",
"GMT-3": "America/Buenos_Aires",
"GMT-2": "Atlantic/South_Georgia",
"GMT-1": "Atlantic/Azores",
"GMT+0": "Africa/Casablanca",
"GMT+1": "Europe/Amsterdam",
"GMT+2": "Europe/Athens",
"GMT+3": "Europe/Moscow",
"GMT+4": "Asia/Baku",
"GMT+5": "Asia/Oral",
"GMT+6": "Asia/Almaty",
"GMT+7": "Asia/Bangkok",
"GMT+8": "Asia/Hong_Kong",
"GMT+9": "Asia/Tokyo",
"GMT+10": "Pacific/Guam",
"GMT+11": "Pacific/Noumea",
"GMT+12": "Pacific/Fiji",
"GMT+13": "Pacific/Tongatapu",
"GMT-11": "Pacific/Midway",
"GMT-10": "Pacific/Honolulu"
}
def abs_path(path):
"""Resolve the '.' and '~' in a path to get the absolute path.
Args:
path: The path to expand.
Returns:
The absolute path of the input path.
"""
return os.path.abspath(os.path.expanduser(path))
def create_dir(path):
"""Creates a directory if it does not exist already.
Args:
path: The path of the directory to create.
"""
full_path = abs_path(path)
if not os.path.exists(full_path):
os.makedirs(full_path)
def get_current_epoch_time():
"""Current epoch time in milliseconds.
Returns:
An integer representing the current epoch time in milliseconds.
"""
return int(round(time.time() * 1000))
def get_current_human_time():
"""Returns the current time in human readable format.
Returns:
The current time stamp in Month-Day-Year Hour:Min:Sec format.
"""
return time.strftime("%m-%d-%Y %H:%M:%S ")
def epoch_to_human_time(epoch_time):
"""Converts an epoch timestamp to human readable time.
This essentially converts an output of get_current_epoch_time to an output
of get_current_human_time
Args:
epoch_time: An integer representing an epoch timestamp in milliseconds.
Returns:
A time string representing the input time.
None if input param is invalid.
"""
if isinstance(epoch_time, int):
try:
d = datetime.datetime.fromtimestamp(epoch_time / 1000)
return d.strftime("%m-%d-%Y %H:%M:%S ")
except ValueError:
return None
def get_timezone_olson_id():
"""Return the Olson ID of the local (non-DST) timezone.
Returns:
A string representing one of the Olson IDs of the local (non-DST)
timezone.
"""
tzoffset = int(time.timezone / 3600)
gmt = None
if tzoffset <= 0:
gmt = "GMT+{}".format(-tzoffset)
else:
gmt = "GMT-{}".format(tzoffset)
return GMT_to_olson[gmt]
def find_files(paths, file_predicate):
"""Locate files whose names and extensions match the given predicate in
the specified directories.
Args:
paths: A list of directory paths where to find the files.
file_predicate: A function that returns True if the file name and
extension are desired.
Returns:
A list of files that match the predicate.
"""
file_list = []
for path in paths:
p = abs_path(path)
for dirPath, subdirList, fileList in os.walk(p):
for fname in fileList:
name, ext = os.path.splitext(fname)
if file_predicate(name, ext):
file_list.append((dirPath, name, ext))
return file_list
def iterate_files(dir_path):
"""A generator yielding regular files in a directory recursively.
Args:
dir_path: A string representing the path to search.
Yields:
A tuple of strings (directory, file). The directory containing
the file and the file name.
"""
for root_dir, dir_names, file_names in os.walk(dir_path):
for file_name in file_names:
yield root_dir, file_name
def load_config(file_full_path):
"""Loads a JSON config file.
Returns:
A JSON object.
"""
if not os.path.isfile(file_full_path):
logging.warning('cwd: %s', os.getcwd())
pypath = os.environ['PYTHONPATH']
if pypath:
for base_path in pypath.split(':'):
logging.info('checking %s', base_path)
new_path = os.path.join(base_path, file_full_path)
if os.path.isfile(new_path):
logging.info('found')
file_full_path = new_path
break
with open(file_full_path, 'r') as f:
conf = json.load(f)
return conf
def load_file_to_base64_str(f_path):
"""Loads the content of a file into a base64 string.
Args:
f_path: full path to the file including the file name.
Returns:
A base64 string representing the content of the file in utf-8 encoding.
"""
path = abs_path(f_path)
with open(path, 'rb') as f:
f_bytes = f.read()
base64_str = base64.b64encode(f_bytes).decode("utf-8")
return base64_str
def find_field(item_list, cond, comparator, target_field):
"""Finds the value of a field in a dict object that satisfies certain
conditions.
Args:
item_list: A list of dict objects.
cond: A param that defines the condition.
comparator: A function that checks if an dict satisfies the condition.
target_field: Name of the field whose value to be returned if an item
satisfies the condition.
Returns:
Target value or None if no item satisfies the condition.
"""
for item in item_list:
if comparator(item, cond) and target_field in item:
return item[target_field]
return None
def rand_ascii_str(length):
"""Generates a random string of specified length, composed of ascii letters
and digits.
Args:
length: The number of characters in the string.
Returns:
The random string generated.
"""
letters = [random.choice(ascii_letters_and_digits) for i in range(length)]
return ''.join(letters)
# Thead/Process related functions.
def concurrent_exec(func, param_list):
"""Executes a function with different parameters pseudo-concurrently.
This is basically a map function. Each element (should be an iterable) in
the param_list is unpacked and passed into the function. Due to Python's
GIL, there's no true concurrency. This is suited for IO-bound tasks.
Args:
func: The function that parforms a task.
param_list: A list of iterables, each being a set of params to be
passed into the function.
Returns:
A list of return values from each function execution. If an execution
caused an exception, the exception object will be the corresponding
result.
"""
with concurrent.futures.ThreadPoolExecutor(max_workers=30) as executor:
# Start the load operations and mark each future with its params
future_to_params = {executor.submit(func, *p): p for p in param_list}
return_vals = []
for future in concurrent.futures.as_completed(future_to_params):
params = future_to_params[future]
try:
return_vals.append(future.result())
except Exception as exc:
print("{} generated an exception: {}".format(
params, traceback.format_exc()))
return_vals.append(exc)
return return_vals
def exe_cmd(*cmds):
"""Executes commands in a new shell.
Args:
cmds: A sequence of commands and arguments.
Returns:
The output of the command run.
Raises:
OSError is raised if an error occurred during the command execution.
"""
cmd = ' '.join(cmds)
proc = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
(out, err) = proc.communicate()
if not err:
return out
raise OSError(err)
def _assert_subprocess_running(proc):
"""Checks if a subprocess has terminated on its own.
Args:
proc: A subprocess returned by subprocess.Popen.
Raises:
VTSUtilsError is raised if the subprocess has stopped.
"""
ret = proc.poll()
if ret is not None:
out, err = proc.communicate()
raise VTSUtilsError("Process %d has terminated. ret: %d, stderr: %s,"
" stdout: %s" % (proc.pid, ret, err, out))
def is_on_windows():
"""Checks whether the OS is Windows.
Returns:
A boolean representing whether the OS is Windows.
"""
return os.name == "nt"
def start_standing_subprocess(cmd, check_health_delay=0):
"""Starts a long-running subprocess.
This is not a blocking call and the subprocess started by it should be
explicitly terminated with stop_standing_subprocess.
For short-running commands, you should use exe_cmd, which blocks.
You can specify a health check after the subprocess is started to make sure
it did not stop prematurely.
Args:
cmd: string, the command to start the subprocess with.
check_health_delay: float, the number of seconds to wait after the
subprocess starts to check its health. Default is 0,
which means no check.
Returns:
The subprocess that got started.
"""
if not is_on_windows():
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
preexec_fn=os.setpgrp)
else:
proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
logging.debug("Start standing subprocess with cmd: %s", cmd)
if check_health_delay > 0:
time.sleep(check_health_delay)
_assert_subprocess_running(proc)
return proc
def stop_standing_subprocess(proc, kill_signal=signal.SIGTERM):
"""Stops a subprocess started by start_standing_subprocess.
Before killing the process, we check if the process is running, if it has
terminated, VTSUtilsError is raised.
Catches and logs the PermissionError which only happens on Macs.
On Windows, SIGABRT, SIGINT, and SIGTERM are replaced with CTRL_BREAK_EVENT
so as to kill every subprocess in the group.
Args:
proc: Subprocess to terminate.
kill_signal: The signal sent to the subprocess group.
"""
pid = proc.pid
logging.debug("Stop standing subprocess %d", pid)
_assert_subprocess_running(proc)
if not is_on_windows():
try:
os.killpg(pid, kill_signal)
except PermissionError as e:
logging.warning("os.killpg(%d, %s) PermissionError: %s",
pid, str(kill_signal), str(e))
else:
if kill_signal in [signal.SIGABRT,
signal.SIGINT,
signal.SIGTERM]:
windows_signal = signal.CTRL_BREAK_EVENT
else:
windows_signal = kill_signal
os.kill(pid, windows_signal)
def wait_for_standing_subprocess(proc, timeout=None):
"""Waits for a subprocess started by start_standing_subprocess to finish
or times out.
Propagates the exception raised by the subprocess.wait(.) function.
The subprocess.TimeoutExpired exception is raised if the process timed-out
rather then terminating.
If no exception is raised: the subprocess terminated on its own. No need
to call stop_standing_subprocess() to kill it.
If an exception is raised: the subprocess is still alive - it did not
terminate. Either call stop_standing_subprocess() to kill it, or call
wait_for_standing_subprocess() to keep waiting for it to terminate on its
own.
Args:
p: Subprocess to wait for.
timeout: An integer number of seconds to wait before timing out.
"""
proc.wait(timeout)
def sync_device_time(ad):
"""Sync the time of an android device with the current system time.
Both epoch time and the timezone will be synced.
Args:
ad: The android device to sync time on.
"""
droid = ad.droid
droid.setTimeZone(get_timezone_olson_id())
droid.setTime(get_current_epoch_time())
# Timeout decorator block
class TimeoutError(Exception):
"""Exception for timeout decorator related errors.
"""
pass
def _timeout_handler(signum, frame):
"""Handler function used by signal to terminate a timed out function.
"""
raise TimeoutError()
def timeout(sec):
"""A decorator used to add time out check to a function.
Args:
sec: Number of seconds to wait before the function times out.
No timeout if set to 0
Returns:
What the decorated function returns.
Raises:
TimeoutError is raised when time out happens.
"""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
if sec:
signal.signal(signal.SIGALRM, _timeout_handler)
signal.alarm(sec)
try:
return func(*args, **kwargs)
except TimeoutError:
raise TimeoutError(("Function {} timed out after {} "
"seconds.").format(func.__name__, sec))
finally:
signal.alarm(0)
return wrapper
return decorator
def trim_model_name(model):
"""Trim any prefix and postfix and return the android designation of the
model name.
e.g. "m_shamu" will be trimmed to "shamu".
Args:
model: model name to be trimmed.
Returns
Trimmed model name if one of the known model names is found.
None otherwise.
"""
# Directly look up first.
if model in models:
return model
if model in manufacture_name_to_model:
return manufacture_name_to_model[model]
# If not found, try trimming off prefix/postfix and look up again.
tokens = re.split("_|-", model)
for t in tokens:
if t in models:
return t
if t in manufacture_name_to_model:
return manufacture_name_to_model[t]
return None
def force_airplane_mode(ad, new_state, timeout_value=60):
"""Force the device to set airplane mode on or off by adb shell command.
Args:
ad: android device object.
new_state: Turn on airplane mode if True.
Turn off airplane mode if False.
timeout_value: max wait time for 'adb wait-for-device'
Returns:
True if success.
False if timeout.
"""
# Using timeout decorator.
# Wait for device with timeout. If after <timeout_value> seconds, adb
# is still waiting for device, throw TimeoutError exception.
@timeout(timeout_value)
def wait_for_device_with_timeout(ad):
ad.adb.wait_for_device()
try:
wait_for_device_with_timeout(ad)
ad.adb.shell("settings put global airplane_mode_on {}".format(
1 if new_state else 0))
except TimeoutError:
# adb wait for device timeout
return False
return True
def enable_doze(ad):
"""Force the device into doze mode.
Args:
ad: android device object.
Returns:
True if device is in doze mode.
False otherwise.
"""
ad.adb.shell("dumpsys battery unplug")
ad.adb.shell("dumpsys deviceidle enable")
if (ad.adb.shell("dumpsys deviceidle force-idle") !=
b'Now forced in to idle mode\r\n'):
return False
ad.droid.goToSleepNow()
time.sleep(5)
adb_shell_result = ad.adb.shell("dumpsys deviceidle step")
if adb_shell_result not in [b'Stepped to: IDLE_MAINTENANCE\r\n',
b'Stepped to: IDLE\r\n']:
info = ("dumpsys deviceidle step: {}dumpsys battery: {}"
"dumpsys deviceidle: {}".format(
adb_shell_result.decode('utf-8'),
ad.adb.shell("dumpsys battery").decode('utf-8'),
ad.adb.shell("dumpsys deviceidle").decode('utf-8')))
print(info)
return False
return True
def disable_doze(ad):
"""Force the device not in doze mode.
Args:
ad: android device object.
Returns:
True if device is not in doze mode.
False otherwise.
"""
ad.adb.shell("dumpsys deviceidle disable")
ad.adb.shell("dumpsys battery reset")
adb_shell_result = ad.adb.shell("dumpsys deviceidle step")
if (adb_shell_result != b'Stepped to: ACTIVE\r\n'):
info = ("dumpsys deviceidle step: {}dumpsys battery: {}"
"dumpsys deviceidle: {}".format(
adb_shell_result.decode('utf-8'),
ad.adb.shell("dumpsys battery").decode('utf-8'),
ad.adb.shell("dumpsys deviceidle").decode('utf-8')))
print(info)
return False
return True
def set_ambient_display(ad, new_state):
"""Set "Ambient Display" in Settings->Display
Args:
ad: android device object.
new_state: new state for "Ambient Display". True or False.
"""
ad.adb.shell("settings put secure doze_enabled {}".format(1 if new_state
else 0))
def set_adaptive_brightness(ad, new_state):
"""Set "Adaptive Brightness" in Settings->Display
Args:
ad: android device object.
new_state: new state for "Adaptive Brightness". True or False.
"""
ad.adb.shell("settings put system screen_brightness_mode {}".format(
1 if new_state else 0))
def set_auto_rotate(ad, new_state):
"""Set "Auto-rotate" in QuickSetting
Args:
ad: android device object.
new_state: new state for "Auto-rotate". True or False.
"""
ad.adb.shell("settings put system accelerometer_rotation {}".format(
1 if new_state else 0))
def set_location_service(ad, new_state):
"""Set Location service on/off in Settings->Location
Args:
ad: android device object.
new_state: new state for "Location service".
If new_state is False, turn off location service.
If new_state if True, set location service to "High accuracy".
"""
if new_state:
ad.adb.shell("settings put secure location_providers_allowed +gps")
ad.adb.shell("settings put secure location_providers_allowed +network")
else:
ad.adb.shell("settings put secure location_providers_allowed -gps")
ad.adb.shell("settings put secure location_providers_allowed -network")
def set_mobile_data_always_on(ad, new_state):
"""Set Mobile_Data_Always_On feature bit
Args:
ad: android device object.
new_state: new state for "mobile_data_always_on"
if new_state is False, set mobile_data_always_on disabled.
if new_state if True, set mobile_data_always_on enabled.
"""
ad.adb.shell("settings put global mobile_data_always_on {}".format(
1 if new_state else 0))