# # Copyright (C) 2017 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # import cmd import datetime import imp # Python v2 compatibility import multiprocessing import multiprocessing.pool import os import re import signal import socket import sys import threading import time import urlparse from host_controller import common from host_controller.command_processor import command_build from host_controller.command_processor import command_config from host_controller.command_processor import command_copy from host_controller.command_processor import command_device from host_controller.command_processor import command_exit from host_controller.command_processor import command_fetch from host_controller.command_processor import command_flash from host_controller.command_processor import command_gsispl from host_controller.command_processor import command_info from host_controller.command_processor import command_lease from host_controller.command_processor import command_list from host_controller.command_processor import command_retry from host_controller.command_processor import command_request from host_controller.command_processor import command_test from host_controller.command_processor import command_upload from host_controller.build import build_provider_ab from host_controller.build import build_provider_gcs from host_controller.build import build_provider_local_fs from host_controller.build import build_provider_pab from host_controller.utils.ipc import shared_dict from host_controller.vti_interface import vti_endpoint_client COMMAND_PROCESSORS = [ command_build.CommandBuild, command_config.CommandConfig, command_copy.CommandCopy, command_device.CommandDevice, command_exit.CommandExit, command_fetch.CommandFetch, command_flash.CommandFlash, command_gsispl.CommandGsispl, command_info.CommandInfo, command_lease.CommandLease, command_list.CommandList, command_retry.CommandRetry, command_request.CommandRequest, command_test.CommandTest, command_upload.CommandUpload, ] class NonDaemonizedProcess(multiprocessing.Process): """Process class which is not daemonized.""" def _get_daemon(self): return False def _set_daemon(self, value): pass daemon = property(_get_daemon, _set_daemon) class NonDaemonizedPool(multiprocessing.pool.Pool): """Pool class which is not daemonized.""" Process = NonDaemonizedProcess def JobMain(vti_address, in_queue, out_queue, device_status): """Main() for a child process that executes a leased job. Currently, lease jobs must use VTI (not TFC). Args: vti_client: VtiEndpointClient needed to create Console. in_queue: Queue to get new jobs. out_queue: Queue to put execution results. device_status: SharedDict, contains device status information. shared between processes. """ if not vti_address: print("vti address is not set. example : $ run --vti=<url>") return def SigTermHandler(signum, frame): """Signal handler for exiting pool process explicitly. Added to resolve orphaned pool process issue. """ sys.exit(0) signal.signal(signal.SIGTERM, SigTermHandler) vti_client = vti_endpoint_client.VtiEndpointClient(vti_address) console = Console( vti_client, None, build_provider_pab.BuildProviderPAB(), None, job_pool=True) console.device_status = device_status multiprocessing.util.Finalize(console, console.__exit__, exitpriority=0) while True: command = in_queue.get() if command == "exit": break elif command == "lease": filepath, kwargs = vti_client.LeaseJob(socket.gethostname(), True) print("Job %s -> %s" % (os.getpid(), kwargs)) if filepath is not None: # TODO: redirect console output and add # console command to access them. for serial in kwargs["serial"]: console.device_status[serial] = common._DEVICE_STATUS_DICT[ "use"] print_to_console = True if not print_to_console: sys.stdout = out sys.stderr = err ret = console.ProcessConfigurableScript( os.path.join(os.getcwd(), "host_controller", "campaigns", filepath), **kwargs) if ret: job_status = "complete" else: job_status = "infra-err" vti_client.StopHeartbeat(job_status) print("Job execution complete. " "Setting job status to {}".format(job_status)) if not print_to_console: sys.stdout = sys.__stdout__ sys.stderr = sys.__stderr__ for serial in kwargs["serial"]: console.device_status[serial] = common._DEVICE_STATUS_DICT[ "ready"] else: print("Unknown job command %s" % command) class Console(cmd.Cmd): """The console for host controllers. Attributes: command_processors: dict of string:BaseCommandProcessor, map between command string and command processors. device_image_info: dict containing info about device image files. prompt: The prompt string at the beginning of each command line. test_result: dict containing info about the last test result. test_suite_info: dict containing info about test suite package files. tools_info: dict containing info about custom tool files. scheduler_thread: dict containing threading.Thread instances(s) that update configs regularly. _build_provider_pab: The BuildProviderPAB used to download artifacts. _vti_address: string, VTI service URI. _vti_client: VtiEndpoewrClient, used to upload data to a test scheduling infrastructure. _tfc_client: The TfcClient that the host controllers connect to. _hosts: A list of HostController objects. _in_file: The input file object. _out_file: The output file object. _serials: A list of string where each string is a device serial. _device_status: SharedDict, shared with process pool. contains status data on each devices. _job_pool: bool, True if Console is created from job pool process context. """ def __init__(self, vti_endpoint_client, tfc, pab, host_controllers, vti_address=None, in_file=sys.stdin, out_file=sys.stdout, job_pool=False): """Initializes the attributes and the parsers.""" # cmd.Cmd is old-style class. cmd.Cmd.__init__(self, stdin=in_file, stdout=out_file) self._build_provider = {} self._build_provider["pab"] = pab self._job_pool = job_pool if not self._job_pool: self._build_provider[ "local_fs"] = build_provider_local_fs.BuildProviderLocalFS() self._build_provider["gcs"] = build_provider_gcs.BuildProviderGCS() self._build_provider["ab"] = build_provider_ab.BuildProviderAB() self._vti_endpoint_client = vti_endpoint_client self._vti_address = vti_address self._tfc_client = tfc self._hosts = host_controllers self._in_file = in_file self._out_file = out_file self.prompt = "> " self.command_processors = {} self.device_image_info = {} self.test_result = {} self.test_suite_info = {} self.tools_info = {} self.fetch_info = {} self.test_results = {} self._device_status = shared_dict.SharedDict() if common._ANDROID_SERIAL in os.environ: self._serials = [os.environ[common._ANDROID_SERIAL]] else: self._serials = [] self.InitCommandModuleParsers() self.SetUpCommandProcessors() def __exit__(self): """Finalizes the build provider attributes explicitly when exited.""" for bp in self._build_provider: self._build_provider[bp].__del__() @property def device_status(self): """getter for self._device_status""" return self._device_status @device_status.setter def device_status(self, device_status): """setter for self._device_status""" self._device_status = device_status def InitCommandModuleParsers(self): """Init all console command modules""" for name in dir(self): if name.startswith('_Init') and name.endswith('Parser'): attr_func = getattr(self, name) if hasattr(attr_func, '__call__'): attr_func() def SetUpCommandProcessors(self): """Sets up all command processors.""" for command_processor in COMMAND_PROCESSORS: cp = command_processor() cp._SetUp(self) do_text = "do_%s" % cp.command help_text = "help_%s" % cp.command setattr(self, do_text, cp._Run) setattr(self, help_text, cp._Help) self.command_processors[cp.command] = cp def TearDown(self): """Removes all command processors.""" for command_processor in self.command_processors.itervalues(): command_processor._TearDown() self.command_processors.clear() def FormatString(self, format_string): """Replaces variables with the values in the console's dictionaries. Args: format_string: The string containing variables enclosed in {}. Returns: The formatted string. Raises: KeyError if a variable is not found in the dictionaries or the value is empty. """ def ReplaceVariable(match): name = match.group(1) if name in ("build_id", "branch", "target"): value = self.fetch_info[name] elif name in ("result_zip", "suite_plan"): value = self.test_result[name] elif name in ("timestamp"): value = datetime.datetime.now().strftime("%Y%m%d-%H%M%S") else: value = None if not value: raise KeyError(name) return value return re.sub("{([^}]+)}", ReplaceVariable, format_string) def ProcessScript(self, script_file_path): """Processes a .py script file. A script file implements a function which emits a list of console commands to execute. That function emits an empty list or None if no more command needs to be processed. Args: script_file_path: string, the path of a script file (.py file). Returns: True if successful; False otherwise """ if not script_file_path.endswith(".py"): print("Script file is not .py file: %s" % script_file_path) return False script_module = imp.load_source('script_module', script_file_path) commands = script_module.EmitConsoleCommands() if commands: for command in commands: ret = self.onecmd(command) if ret == False: return False return True def ProcessConfigurableScript(self, script_file_path, **kwargs): """Processes a .py script file. A script file implements a function which emits a list of console commands to execute. That function emits an empty list or None if no more command needs to be processed. Args: script_file_path: string, the path of a script file (.py file). kwargs: extra args for the interface function defined in the script file. Returns: True if successful; False otherwise """ if script_file_path and "." not in script_file_path: script_file_path += ".py" if not script_file_path.endswith(".py"): print("Script file is not .py file: %s" % script_file_path) return False script_module = imp.load_source('script_module', script_file_path) commands = script_module.EmitConsoleCommands(**kwargs) if commands: for command in commands: ret = self.onecmd(command) if ret == False: return False else: return False return True def _Print(self, string): """Prints a string and a new line character. Args: string: The string to be printed. """ self._out_file.write(string + "\n") def _PrintObjects(self, objects, attr_names): """Shows objects as a table. Args: object: The objects to be shown, one object in a row. attr_names: The attributes to be shown, one attribute in a column. """ width = [len(name) for name in attr_names] rows = [attr_names] for dev_info in objects: attrs = [ _ToPrintString(getattr(dev_info, name, "")) for name in attr_names ] rows.append(attrs) for index, attr in enumerate(attrs): width[index] = max(width[index], len(attr)) for row in rows: self._Print(" ".join( attr.ljust(width[index]) for index, attr in enumerate(row))) def DownloadTestResources(self, request_id): """Download all of the test resources for a TFC request id. Args: request_id: int, TFC request id """ resources = self._tfc_client.TestResourceList(request_id) for resource in resources: self.DownloadTestResource(resource['url']) def DownloadTestResource(self, url): """Download a test resource with build provider, given a url. Args: url: a resource locator (not necessarily HTTP[s]) with the scheme specifying the build provider. """ parsed = urlparse.urlparse(url) path = (parsed.netloc + parsed.path).split('/') if parsed.scheme == "pab": if len(path) != 5: print("Invalid pab resource locator: %s" % url) return account_id, branch, target, build_id, artifact_name = path cmd = ("fetch" " --type=pab" " --account_id=%s" " --branch=%s" " --target=%s" " --build_id=%s" " --artifact_name=%s") % (account_id, branch, target, build_id, artifact_name) self.onecmd(cmd) elif parsed.scheme == "ab": if len(path) != 4: print("Invalid ab resource locator: %s" % url) return branch, target, build_id, artifact_name = path cmd = ("fetch" "--type=ab" " --branch=%s" " --target=%s" " --build_id=%s" " --artifact_name=%s") % (branch, target, build_id, artifact_name) self.onecmd(cmd) elif parsed.scheme == gcs: cmd = "fetch --type=gcs --path=%s" % url self.onecmd(cmd) else: print "Invalid URL: %s" % url def SetSerials(self, serials): """Sets the default serial numbers for flashing and testing. Args: serials: A list of strings, the serial numbers. """ self._serials = serials def GetSerials(self): """Returns the serial numbers saved in the console. Returns: A list of strings, the serial numbers. """ return self._serials def JobThread(self): """Job thread which monitors and uploads results.""" thread = threading.currentThread() while getattr(thread, "keep_running", True): time.sleep(1) if self._job_pool: self._job_pool.close() self._job_pool.terminate() self._job_pool.join() def StartJobThreadAndProcessPool(self): """Starts a background thread to control leased jobs.""" self._job_in_queue = multiprocessing.Queue() self._job_out_queue = multiprocessing.Queue() self._job_pool = NonDaemonizedPool( common._MAX_LEASED_JOBS, JobMain, (self._vti_address, self._job_in_queue, self._job_out_queue, self._device_status)) self._job_thread = threading.Thread(target=self.JobThread) self._job_thread.daemon = True self._job_thread.start() def StopJobThreadAndProcessPool(self): """Terminates the thread and processes that runs the leased job.""" if hasattr(self, "_job_thread"): self._job_thread.keep_running = False self._job_thread.join() # @Override def onecmd(self, line, depth=1, ret_out_queue=None): """Executes command(s) and prints any exception. Parallel execution only for 2nd-level list element. Args: line: a list of string or string which keeps the command to run. """ if not line: return if type(line) == list: if depth == 1: # 1 to use multi-threading jobs = [] ret_queue = multiprocessing.Queue() for sub_command in line: p = multiprocessing.Process( target=self.onecmd, args=( sub_command, depth + 1, ret_queue, )) jobs.append(p) p.start() for job in jobs: job.join() ret_cmd_list = True while not ret_queue.empty(): ret_from_subprocess = ret_queue.get() ret_cmd_list = ret_cmd_list and ret_from_subprocess if ret_cmd_list == False: return False else: for sub_command in line: ret_cmd_list = self.onecmd(sub_command, depth + 1) if ret_cmd_list == False and ret_out_queue: ret_out_queue.put(False) return False return print("Command: %s" % line) try: ret_cmd = cmd.Cmd.onecmd(self, line) if ret_cmd == False and ret_out_queue: ret_out_queue.put(ret_cmd) return ret_cmd except Exception as e: self._Print("%s: %s" % (type(e).__name__, e)) if ret_out_queue: ret_out_queue.put(False) return False # @Override def emptyline(self): """Ignores empty lines.""" pass # @Override def default(self, line): """Handles unrecognized commands. Returns: True if receives EOF; otherwise delegates to default handler. """ if line == "EOF": return self.do_exit(line) return cmd.Cmd.default(self, line) def _ToPrintString(obj): """Converts an object to printable string on console. Args: obj: The object to be printed. """ if isinstance(obj, (list, tuple, set)): return ",".join(str(x) for x in obj) return str(obj)