#!/usr/bin/python3
#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import os
import re
import subprocess
import sys
import threading
import time
from subprocess import PIPE
# class for running android device from python
# it will fork the device processor
class AndroidDevice(object):
def __init__(self, serial):
self._serial = serial
def run_adb_command(self, cmd, timeout=None):
adb_cmd = "adb -s %s %s" % (self._serial, cmd)
print(adb_cmd)
adb_process = subprocess.Popen(args=adb_cmd.split(), bufsize=-1, stderr=PIPE, stdout=PIPE)
(out, err) = adb_process.communicate(timeout=timeout)
return out.decode('utf-8').strip(), err.decode('utf-8').strip()
def run_shell_command(self, cmd):
return self.run_adb_command("shell %s" % cmd)
def wait_for_device(self, timeout=30):
return self.run_adb_command('wait-for-device', timeout)
def wait_for_prop(self, key, value, timeout=30):
boot_complete = False
attempts = 0
wait_period = 1
while not boot_complete and (attempts*wait_period) < timeout:
(out, err) = self.run_shell_command("getprop %s" % key)
if out == value:
boot_complete = True
else:
time.sleep(wait_period)
attempts += 1
if not boot_complete:
print("%s not set to %s within timeout!" % (key, value))
return boot_complete
def wait_for_service(self, name, timeout=30):
service_found = False
attempts = 0
wait_period = 1
while not service_found and (attempts*wait_period) < timeout:
(output, err) = self.run_shell_command("service check %s" % name)
if 'not found' not in output:
service_found = True
else:
time.sleep(wait_period)
attempts += 1
if not service_found:
print("Service '%s' not found within timeout!" % name)
return service_found
def wait_for_boot_complete(self, timeout=60):
return self.wait_for_prop('dev.bootcomplete', '1', timeout)
def install_apk(self, apk_path):
self.wait_for_service('package')
(out, err) = self.run_adb_command("install -r -d -g %s" % apk_path)
result = out.split()
return out, err, "Success" in result
def uninstall_package(self, package):
self.wait_for_service('package')
(out, err) = self.run_adb_command("uninstall %s" % package)
result = out.split()
return "Success" in result
def run_instrumentation_test(self, option):
self.wait_for_service('activity')
return self.run_shell_command("am instrument -w --no-window-animation %s" % option)
def is_process_alive(self, process_name):
(out, err) = self.run_shell_command("ps")
names = out.split()
# very lazy implementation as it does not filter out things like uid
# should work mostly unless processName is too simple to overlap with
# uid. So only use name like com.android.xyz
return process_name in names
def get_version_sdk(self):
return int(self.run_shell_command("getprop ro.build.version.sdk")[0])
def get_version_codename(self):
return self.run_shell_command("getprop ro.build.version.codename")[0].strip()
def get_density(self):
if "emulator" in self._serial:
return int(self.run_shell_command("getprop qemu.sf.lcd_density")[0])
else:
return int(self.run_shell_command("getprop ro.sf.lcd_density")[0])
def get_orientation(self):
return int(self.run_shell_command("dumpsys | grep SurfaceOrientation")[0].split()[1])
def enumerate_android_devices(require_prefix=''):
devices = subprocess.check_output(["adb", "devices"])
if not devices:
return []
devices = devices.decode('UTF-8').split('\n')[1:]
device_list = []
for device in devices:
if device is not "" and device.startswith(require_prefix):
info = device.split('\t')
if info[1] == "device":
device_list.append(info[0])
return device_list