#!/usr/bin/env python2 # Copyright 2017 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Load generator for devserver. Example usage: # Find DUTs in suites pool to test with: atest host list -b 'pool:suites,board:BOARD' --unlocked -s Ready # Lock DUTs: atest host mod -l -r 'quick provision testing' DUT1 DUT2 # Create config file with DUTs to test and builds to use. cat >config.json <<EOD { "BOARD": { "duts": [ "chromeosX-rowY-rackZ-hostA", "chromeosX-rowY-rackZ-hostB", ], "versions": [ "auron_paine-paladin/R65-10208.0.0-rc2", "auron_paine-paladin/R65-10208.0.0-rc3", "auron_paine-paladin/R65-10209.0.0-rc1" ] }, } EOD # Do 100 total provisions, aiming to have 10 active simultaneously. loadtest.py $DS config.json --simultaneous 10 --total 100 # Unlock DUTs: atest host mod -u DUT1 DUT2 """ import collections import datetime import json import random import re import signal import subprocess import sys import time import common from autotest_lib.client.common_lib import time_utils from autotest_lib.client.common_lib.cros import dev_server from chromite.lib import commandline from chromite.lib import cros_logging as logging from chromite.lib import locking from chromite.lib import parallel # Paylods to stage. PAYLOADS = ['quick_provision', 'stateful'] # Number of seconds between full status checks. STATUS_POLL_SECONDS = 2 # Number of successes/failures to blacklist a DUT. BLACKLIST_CONSECUTIVE_FAILURE = 2 BLACKLIST_TOTAL_SUCCESS = 0 BLACKLIST_TOTAL_FAILURE = 5 def get_parser(): """Creates the argparse parser.""" parser = commandline.ArgumentParser(description=__doc__) parser.add_argument('server', type=str, action='store', help='Devserver to load test.') parser.add_argument('config', type=str, action='store', help='Path to JSON config file.' 'Config file is indexed by board with keys of ' '"duts" and "versions", each a list.') parser.add_argument('--blacklist-consecutive', '-C', type=int, action='store', help=('Consecutive number of failures before ' 'blacklisting DUT (default %d).') % BLACKLIST_CONSECUTIVE_FAILURE, default=BLACKLIST_CONSECUTIVE_FAILURE) parser.add_argument('--blacklist-success', '-S', type=int, action='store', help=('Total number of successes before blacklisting ' 'DUT (default %d).') % BLACKLIST_TOTAL_SUCCESS, default=BLACKLIST_TOTAL_SUCCESS) parser.add_argument('--blacklist-total', '-T', type=int, action='store', help=('Total number of failures before blacklisting ' 'DUT (default %d).') % BLACKLIST_TOTAL_FAILURE, default=BLACKLIST_TOTAL_FAILURE) parser.add_argument('--boards', '-b', type=str, action='store', help='Comma-separated list of boards to provision.') parser.add_argument('--dryrun', '-n', action='store_true', dest='dryrun', help='Do not attempt to provision.') parser.add_argument('--duts', '-d', type=str, action='store', help='Comma-separated list of duts to provision.') parser.add_argument('--outputlog', '-l', type=str, action='store', help='Path to append JSON entries to.') parser.add_argument('--output', '-o', type=str, action='store', help='Path to write JSON file to.') parser.add_argument('--ping', '-p', action='store_true', help='Ping DUTs and blacklist unresponsive ones.') parser.add_argument('--simultaneous', '-s', type=int, action='store', help='Number of simultaneous provisions to run.', default=1) parser.add_argument('--no-stage', action='store_false', dest='stage', default=True, help='Do not attempt to stage builds.') parser.add_argument('--total', '-t', type=int, action='store', help='Number of total provisions to run.', default=0) return parser def make_entry(entry_id, name, status, start_time, finish_time=None, parent=None, **kwargs): """Generate an event log entry to be stored in Cloud Datastore. @param entry_id: A (Kind, id) tuple representing the key. @param name: A string identifying the event @param status: A string identifying the status of the event. @param start_time: A datetime of the start of the event. @param finish_time: A datetime of the finish of the event. @param parent: A (Kind, id) tuple representing the parent key. @return A dictionary representing the entry suitable for dumping via JSON. """ entry = { 'id': entry_id, 'name': name, 'status': status, 'start_time': time_utils.to_epoch_time(start_time), } if finish_time is not None: entry['finish_time'] = time_utils.to_epoch_time(finish_time) if parent is not None: entry['parent'] = parent return entry class Job(object): """Tracks a single provision job.""" def __init__(self, ds, host_name, build_name, entry_id=0, parent=None, board=None, start_active=0, force_update=False, full_update=False, clobber_stateful=True, quick_provision=True, ping=False, dryrun=False): self.ds = ds self.host_name = host_name self.build_name = build_name self.entry_id = ('Job', entry_id) self.parent = parent self.board = board self.start_active = start_active self.end_active = None self.check_active_sum = 0 self.check_active_count = 0 self.start_time = datetime.datetime.now() self.finish_time = None self.trigger_response = None self.ping = ping self.pre_ping = None self.post_ping = None self.kwargs = { 'host_name': host_name, 'build_name': build_name, 'force_update': force_update, 'full_update': full_update, 'clobber_stateful': clobber_stateful, 'quick_provision': quick_provision, } if dryrun: self.finish_time = datetime.datetime.now() self.raised_error = None self.success = True self.pid = 0 else: if self.ping: self.pre_ping = ping_dut(self.host_name) self.trigger_response = ds._trigger_auto_update(**self.kwargs) def as_entry(self): """Generate an entry for exporting to datastore.""" entry = make_entry(self.entry_id, self.host_name, 'pass' if self.success else 'fail', self.start_time, self.finish_time, self.parent) entry.update({ 'build_name': self.build_name, 'board': self.board, 'devserver': self.ds.hostname, 'start_active': self.start_active, 'end_active': self.end_active, 'force_update': self.kwargs['force_update'], 'full_update': self.kwargs['full_update'], 'clobber_stateful': self.kwargs['clobber_stateful'], 'quick_provision': self.kwargs['quick_provision'], 'elapsed': int(self.elapsed().total_seconds()), 'trigger_response': self.trigger_response, 'pre_ping': self.pre_ping, 'post_ping': self.post_ping, }) if self.check_active_count: entry['avg_active'] = (self.check_active_sum / self.check_active_count) return entry def check(self, active_count): """Checks if a job has completed. @param active_count: Number of active provisions at time of the check. @return: True if the job has completed, False otherwise. """ if self.finish_time is not None: return True self.check_active_sum += active_count self.check_active_count += 1 finished, raised_error, pid = self.ds.check_for_auto_update_finished( self.trigger_response, wait=False, **self.kwargs) if finished: self.finish_time = datetime.datetime.now() self.raised_error = raised_error self.success = self.raised_error is None self.pid = pid self.end_active = active_count if self.ping: self.post_ping = ping_dut(self.host_name) return finished def elapsed(self): """Determine the elapsed time of the task.""" finish_time = self.finish_time or datetime.datetime.now() return finish_time - self.start_time class Runner(object): """Parallel provision load test runner.""" def __init__(self, ds, duts, config, simultaneous=1, total=0, outputlog=None, ping=False, blacklist_consecutive=None, blacklist_success=None, blacklist_total=None, dryrun=False): self.ds = ds self.duts = duts self.config = config self.start_time = datetime.datetime.now() self.finish_time = None self.simultaneous = simultaneous self.total = total self.outputlog = outputlog self.ping = ping self.blacklist_consecutive = blacklist_consecutive self.blacklist_success = blacklist_success self.blacklist_total = blacklist_total self.dryrun = dryrun self.active = [] self.started = 0 self.completed = [] # Track DUTs which have failed multiple times. self.dut_blacklist = set() # Track versions of each DUT to provision in order. self.last_versions = {} # id for the parent entry. # TODO: This isn't the most unique. self.entry_id = ('Runner', int(time_utils.to_epoch_time(datetime.datetime.now()))) # ids for the job entries. self.next_id = 0 if self.outputlog: dump_entries_as_json([self.as_entry()], self.outputlog) def signal_handler(self, signum, frame): """Signal handle to dump current status.""" logging.info('Received signal %s', signum) if signum == signal.SIGUSR1: now = datetime.datetime.now() logging.info('%d active provisions, %d completed provisions, ' '%s elapsed:', len(self.active), len(self.completed), now - self.start_time) for job in self.active: logging.info(' %s -> %s, %s elapsed', job.host_name, job.build_name, now - job.start_time) def as_entry(self): """Generate an entry for exporting to datastore.""" entry = make_entry(self.entry_id, 'Runner', 'pass', self.start_time, self.finish_time) entry.update({ 'devserver': self.ds.hostname, }) return entry def get_completed_entries(self): """Retrieves all completed jobs as entries for datastore.""" entries = [self.as_entry()] entries.extend([job.as_entry() for job in self.completed]) return entries def get_next_id(self): """Get the next Job id.""" entry_id = self.next_id self.next_id += 1 return entry_id def spawn(self, host_name, build_name): """Spawn a single provision job.""" job = Job(self.ds, host_name, build_name, entry_id=self.get_next_id(), parent=self.entry_id, board=self.get_dut_board_type(host_name), start_active=len(self.active), ping=self.ping, dryrun=self.dryrun) self.active.append(job) logging.info('Provision (%d) of %s to %s started', job.entry_id[1], job.host_name, job.build_name) self.last_versions[host_name] = build_name self.started += 1 def replenish(self): """Replenish the number of active provisions to match goals.""" while ((self.simultaneous == 0 or len(self.active) < self.simultaneous) and (self.total == 0 or self.started < self.total)): host_name = self.find_idle_dut() if host_name: build_name = self.find_build_for_dut(host_name) self.spawn(host_name, build_name) elif self.simultaneous: logging.warn('Insufficient DUTs to satisfy goal') return False else: return len(self.active) > 0 return True def check_all(self): """Check the status of outstanding provisions.""" still_active = [] for job in self.active: if job.check(len(self.active)): logging.info('Provision (%d) of %s to %s %s in %s: %s', job.entry_id[1], job.host_name, job.build_name, 'completed' if job.success else 'failed', job.elapsed(), job.raised_error) entry = job.as_entry() logging.debug(json.dumps(entry)) if self.outputlog: dump_entries_as_json([entry], self.outputlog) self.completed.append(job) if self.should_blacklist(job.host_name): logging.error('Blacklisting DUT %s', job.host_name) self.dut_blacklist.add(job.host_name) else: still_active.append(job) self.active = still_active def should_blacklist(self, host_name): """Determines if a given DUT should be blacklisted.""" jobs = [job for job in self.completed if job.host_name == host_name] total = 0 consecutive = 0 successes = 0 for job in jobs: if not job.success: total += 1 consecutive += 1 if ((self.blacklist_total is not None and total >= self.blacklist_total) or (self.blacklist_consecutive is not None and consecutive >= self.blacklist_consecutive)): return True else: successes += 1 if (self.blacklist_success is not None and successes >= self.blacklist_success): return True consecutive = 0 return False def find_idle_dut(self): """Find an idle DUT to provision..""" active_duts = {job.host_name for job in self.active} idle_duts = [d for d in self.duts if d not in active_duts | self.dut_blacklist] return random.choice(idle_duts) if len(idle_duts) else None def get_dut_board_type(self, host_name): """Determine the board type of a DUT.""" return self.duts[host_name] def get_board_versions(self, board): """Determine the versions to provision for a board.""" return self.config[board]['versions'] def find_build_for_dut(self, host_name): """Determine a build to provision on a DUT.""" board = self.get_dut_board_type(host_name) versions = self.get_board_versions(board) last_version = self.last_versions.get(host_name) try: last_index = versions.index(last_version) except ValueError: return versions[0] return versions[(last_index + 1) % len(versions)] def stage(self, build): logging.debug('Staging %s', build) self.ds.stage_artifacts(build, PAYLOADS) def stage_all(self): """Stage all necessary artifacts.""" boards = set(self.duts.values()) logging.info('Staging for %d boards', len(boards)) funcs = [] for board in boards: for build in self.get_board_versions(board): funcs.append(lambda build_=build: self.stage(build_)) parallel.RunParallelSteps(funcs) def loop(self): """Run the main provision loop.""" # Install a signal handler for status updates. old_handler = signal.signal(signal.SIGUSR1, self.signal_handler) signal.siginterrupt(signal.SIGUSR1, False) try: while True: self.check_all() if self.total != 0 and len(self.completed) >= self.total: break if not self.replenish() and len(self.active) == 0: logging.error('Unable to replenish with no active ' 'provisions') return False logging.debug('%d provisions active', len(self.active)) time.sleep(STATUS_POLL_SECONDS) return True except KeyboardInterrupt: return False finally: self.finish_time = datetime.datetime.now() # Clean up signal handler. signal.signal(signal.SIGUSR1, old_handler) def elapsed(self): """Determine the elapsed time of the task.""" finish_time = self.finish_time or datetime.datetime.now() return finish_time - self.start_time def dump_entries_as_json(entries, output_file): """Dump event log entries as json to a file. @param entries: A list of event log entries to dump. @param output_file: The file to write to. """ # Write the entries out as JSON. logging.debug('Dumping %d entries' % len(entries)) for e in entries: json.dump(e, output_file, sort_keys=True) output_file.write('\n') output_file.flush() def ping_dut(hostname): """Checks if a host is responsive to pings.""" if re.match('^chromeos\d+-row\d+-rack\d+-host\d+$', hostname): hostname += '.cros' response = subprocess.call(["/bin/ping", "-c1", "-w2", hostname], stdout=subprocess.PIPE) return response == 0 def main(argv): """Load generator for a devserver.""" parser = get_parser() options = parser.parse_args(argv) # Parse devserver. if options.server: if re.match(r'^https?://', options.server): server = options.server else: server = 'http://%s/' % options.server ds = dev_server.ImageServer(server) else: parser.print_usage() logging.error('Must specify devserver') sys.exit(1) # Parse config file and determine master list of duts and their board type, # filtering by board type if specified. duts = {} if options.config: with open(options.config, 'r') as f: config = json.load(f) boards = (options.boards.split(',') if options.boards else config.keys()) duts_specified = (set(options.duts.split(',')) if options.duts else None) for board in boards: duts.update({dut: board for dut in config[board]['duts'] if duts_specified is None or dut in duts_specified}) logging.info('Config file %s: %d boards, %d duts', options.config, len(boards), len(duts)) else: parser.print_usage() logging.error('Must specify config file') sys.exit(1) if options.ping: logging.info('Performing ping tests') duts_alive = {} for dut, board in duts.items(): if ping_dut(dut): duts_alive[dut] = board else: logging.error('Ignoring DUT %s (%s) for failing initial ' 'ping check', dut, board) duts = duts_alive logging.info('After ping tests: %d boards, %d duts', len(boards), len(duts)) # Set up the test runner and stage all the builds. outputlog = open(options.outputlog, 'a') if options.outputlog else None runner = Runner(ds, duts, config, simultaneous=options.simultaneous, total=options.total, outputlog=outputlog, ping=options.ping, blacklist_consecutive=options.blacklist_consecutive, blacklist_success=options.blacklist_success, blacklist_total=options.blacklist_total, dryrun=options.dryrun) if options.stage: runner.stage_all() # Run all the provisions. with locking.FileLock(options.config, blocking=True).lock(): completed = runner.loop() logging.info('%s in %s', 'Completed' if completed else 'Interrupted', runner.elapsed()) # Write all entries as JSON. entries = runner.get_completed_entries() if options.output: with open(options.output, 'w') as f: dump_entries_as_json(entries, f) else: dump_entries_as_json(entries, sys.stdout) logging.info('Summary: %s', dict(collections.Counter([e['status'] for e in entries if e['name'] != 'Runner']))) # List blacklisted DUTs. if runner.dut_blacklist: logging.warn('Blacklisted DUTs:') for host_name in runner.dut_blacklist: logging.warn(' %s', host_name) if __name__ == '__main__': sys.exit(main(sys.argv[1:]))