# SPDX-License-Identifier: Apache-2.0 # # Copyright (C) 2015, ARM Limited and contributors. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # from bart.common.Analyzer import Analyzer import collections from collections import namedtuple import datetime import gzip import json import os import re import time import trappy from devlib import TargetError # Configure logging import logging # Add JSON parsing support from conf import JsonConf import wlgen from devlib import TargetError Experiment = namedtuple('Experiment', ['wload_name', 'wload', 'conf', 'iteration', 'out_dir']) class Executor(): """ Abstraction for running sets of experiments and gathering data from targets An executor can be configured to run a set of workloads (wloads) in each different target configuration of a specified set (confs). These wloads and confs can be specified by the "experiments_conf" input dictionary. Each (workload, conf, iteration) tuple is called an "experiment". After the workloads have been run, the Executor object's `experiments` attribute is a list of Experiment objects. The `out_dir` attribute of these objects can be used to find the results of the experiment. This output directory follows this format: results/<test_id>/<wltype>:<conf>:<wload>/<run_id> where: test_id Is the "tid" defined by the experiments_conf, or a timestamp based folder in case "tid" is not specified. wltype Is the class of workload executed, e.g. rtapp or sched_perf. conf Is the "tag" of one of the specified **confs**. wload Is the identifier of one of the specified **wloads**. run_id Is the progressive execution number from 1 up to the specified **iterations**. :param experiments_conf: Dict with experiment configuration. Keys are: **confs** Mandatory. Platform configurations to be tested. List of dicts, each with keys: tag String to identify this configuration. Required, may be empty. flags List of strings describing features required for this conf. Available flags are: "ftrace" Enable collecting ftrace during the experiment. "freeze_userspace" Use the cgroups freezer to freeze as many userspace tasks as possible during the experiment execution, in order to reduce system noise. Some tasks cannot be frozen, such as those required to maintain a connection to LISA. sched_features Optional list of features to be written to /sys/kernel/debug/sched_features. Prepend "NO\_" to a feature to actively disable it. Requires ``CONFIG_SCHED_DEBUG`` in target kernel. cpufreq Parameters to configure cpufreq via Devlib's cpufreq module. Dictionary with fields: .. TODO link to devlib cpufreq module docs (which don't exist) governor cpufreq governor to set (for all CPUs) before execution. The previous governor is not restored when execution is finished. governor_tunables Dictionary of governor-specific tunables, expanded and passed as kwargs to the cpufreq module's ``set_governor_tunables`` method. freq Requires "governor" to be "userspace". Dictionary mapping CPU numbers to frequencies. Exact frequencies should be available on those CPUs. It is not necessary to provide a frequency for every CPU - the frequency for unspecified CPUs is not affected. Note that cpufreq will transparrently set the frequencies of any other CPUs sharing a clock domain. cgroups Optional cgroups configuration. To use this, ensure the 'cgroups' devlib module is enabled in your test_conf Contains fields: .. TODO reference test_conf .. TODO link to devlib cgroup module's docs (which don't exist) conf Dict specifying the cgroup controllers, cgroups, and cgroup parameters to setup. If a controller listed here is not enabled in the target kernel, a message is logged and the configuration is **ignored**. Of the form: :: "<controller>" : { "<group1>" : { "<group_param" : <value> } "<group2>" : { "<group_param" : <value> } } These cgroups can then be used in the "cgroup" field of workload specifications. default The default cgroup to run workloads in, if no "cgroup" is specified. For example, to create a cpuset cgroup named "/big" which restricts constituent tasks to CPUs 1 and 2: :: "cgroups" : { "conf" : { "cpuset" : { "/big" : {"cpus" : "1-2"}, } }, "default" : "/", } **wloads** .. TODO document wloads field. Mandatory. Workloads to run on each platform configuration **iterations** Number of iterations for each workload/conf combination. Default is 1. :type experiments_conf: dict :ivar experiments: After calling `meth`:run:, the list of :class:`Experiment` s that were run :ivar iterations: The number of iterations run for each wload/conf pair (i.e. ``experiments_conf['iterations']``. """ critical_tasks = { 'linux': ['init', 'systemd', 'sh', 'ssh', 'rsyslogd', 'jbd2'], 'android': [ 'sh', 'adbd', 'usb', 'transport', # We don't actually need this task but on Google Pixel it apparently # cannot be frozen, so the cgroup state gets stuck in FREEZING if we # try to freeze it. 'thermal-engine' ] } """ Dictionary mapping OS name to list of task names that we can't afford to freeze when using freeeze_userspace. """ def __init__(self, test_env, experiments_conf): # Initialize globals self._default_cgroup = None self._cgroup = None # Setup logging self._log = logging.getLogger('Executor') # Setup test configuration if isinstance(experiments_conf, dict): self._log.info('Loading custom (inline) test configuration') self._experiments_conf = experiments_conf elif isinstance(experiments_conf, str): self._log.info('Loading custom (file) test configuration') json_conf = JsonConf(experiments_conf) self._experiments_conf = json_conf.load() else: raise ValueError( 'experiments_conf must be either a dictionary or a filepath') # Check for mandatory configurations if not self._experiments_conf.get('confs', None): raise ValueError('Configuration error: ' 'missing "conf" definitions') if not self._experiments_conf.get('wloads', None): raise ValueError('Configuration error: ' 'missing "wloads" definitions') self.te = test_env self.target = self.te.target self.iterations = self._experiments_conf.get('iterations', 1) # Compute total number of experiments self._exp_count = self.iterations \ * len(self._experiments_conf['wloads']) \ * len(self._experiments_conf['confs']) self._print_section('Experiments configuration') self._log.info('Configured to run:') self._log.info(' %3d target configurations:', len(self._experiments_conf['confs'])) target_confs = [conf['tag'] for conf in self._experiments_conf['confs']] target_confs = ', '.join(target_confs) self._log.info(' %s', target_confs) self._log.info(' %3d workloads (%d iterations each)', len(self._experiments_conf['wloads']), self.iterations) wload_confs = ', '.join(self._experiments_conf['wloads']) self._log.info(' %s', wload_confs) self._log.info('Total: %d experiments', self._exp_count) self._log.info('Results will be collected under:') self._log.info(' %s', self.te.res_dir) if any(wl['type'] == 'rt-app' for wl in self._experiments_conf['wloads'].values()): self._log.info('rt-app workloads found, installing tool on target') self.te.install_tools(['rt-app']) def run(self): self._print_section('Experiments execution') self.experiments = [] # Run all the configured experiments exp_idx = 0 for tc in self._experiments_conf['confs']: # TARGET: configuration if not self._target_configure(tc): continue for wl_idx in self._experiments_conf['wloads']: # TEST: configuration wload, test_dir = self._wload_init(tc, wl_idx) for itr_idx in range(1, self.iterations + 1): exp = Experiment( wload_name=wl_idx, wload=wload, conf=tc, iteration=itr_idx, out_dir=os.path.join(test_dir, str(itr_idx))) self.experiments.append(exp) # WORKLOAD: execution self._wload_run(exp_idx, exp) exp_idx += 1 self._target_cleanup(tc) self._print_section('Experiments execution completed') self._log.info('Results available in:') self._log.info(' %s', self.te.res_dir) ################################################################################ # Target Configuration ################################################################################ def _cgroups_init(self, tc): self._default_cgroup = None if 'cgroups' not in tc: return True if 'cgroups' not in self.target.modules: raise RuntimeError('CGroups module not available. Please ensure ' '"cgroups" is listed in your target/test modules') self._log.info('Initialize CGroups support...') errors = False for kind in tc['cgroups']['conf']: self._log.info('Setup [%s] CGroup controller...', kind) controller = self.target.cgroups.controller(kind) if not controller: self._log.warning('CGroups controller [%s] NOT available', kind) errors = True return not errors def _setup_kernel(self, tc): # Deploy kernel on the device self.te.install_kernel(tc, reboot=True) # Setup the rootfs for the experiments self._setup_rootfs(tc) def _setup_sched_features(self, tc): if 'sched_features' not in tc: self._log.debug('Scheduler features configuration not provided') return feats = tc['sched_features'].split(",") for feat in feats: self._log.info('Set scheduler feature: %s', feat) self.target.execute('echo {} > /sys/kernel/debug/sched_features'.format(feat), as_root=True) def _setup_rootfs(self, tc): # Initialize CGroups if required self._cgroups_init(tc) # Setup target folder for experiments execution self.te.run_dir = os.path.join( self.target.working_directory, TGT_RUN_DIR) # Create run folder as tmpfs self._log.debug('Setup RT-App run folder [%s]...', self.te.run_dir) self.target.execute('[ -d {0} ] || mkdir {0}'\ .format(self.te.run_dir)) self.target.execute( 'grep schedtest /proc/mounts || '\ ' mount -t tmpfs -o size=1024m {} {}'\ .format('schedtest', self.te.run_dir), as_root=True) # tmpfs mounts have an SELinux context with "tmpfs" as the type (while # other files we create have "shell_data_file"). That prevents non-root # users from creating files in tmpfs mounts. For now, just put SELinux # in permissive mode to get around that. try: # First, save the old SELinux mode self._old_selinux_mode = self.target.execute('getenforce') self._log.warning('Setting target SELinux in permissive mode') self.target.execute('setenforce 0', as_root=True) except TargetError: # Probably the target doesn't have SELinux, or there are no # contexts set up. No problem. self._log.warning("Couldn't set SELinux in permissive mode. " "This is probably fine.") self._old_selinux_mode = None def _setup_cpufreq(self, tc): if 'cpufreq' not in tc: self._log.warning('cpufreq governor not specified, ' 'using currently configured governor') return cpufreq = tc['cpufreq'] self._log.info('Configuring all CPUs to use [%s] cpufreq governor', cpufreq['governor']) self.target.cpufreq.set_all_governors(cpufreq['governor']) if 'freqs' in cpufreq: if cpufreq['governor'] != 'userspace': raise ValueError('Must use userspace governor to set CPU freqs') self._log.info(r'%14s - CPU frequencies: %s', 'CPUFreq', str(cpufreq['freqs'])) for cpu, freq in cpufreq['freqs'].iteritems(): self.target.cpufreq.set_frequency(cpu, freq) if 'params' in cpufreq: self._log.info('governor params: %s', str(cpufreq['params'])) for cpu in self.target.list_online_cpus(): self.target.cpufreq.set_governor_tunables( cpu, cpufreq['governor'], **cpufreq['params']) def _setup_cgroups(self, tc): if 'cgroups' not in tc: return True # Setup default CGroup to run tasks into if 'default' in tc['cgroups']: self._default_cgroup = tc['cgroups']['default'] # Configure each required controller if 'conf' not in tc['cgroups']: return True errors = False for kind in tc['cgroups']['conf']: controller = self.target.cgroups.controller(kind) if not controller: self._log.warning('Configuration error: ' '[%s] contoller NOT supported', kind) errors = True continue self._setup_controller(tc, controller) return not errors def _setup_controller(self, tc, controller): kind = controller.kind # Configure each required groups for that controller errors = False for name in tc['cgroups']['conf'][controller.kind]: if name[0] != '/': raise ValueError('Wrong CGroup name [{}]. ' 'CGroups names must start by "/".' .format(name)) group = controller.cgroup(name) if not group: self._log.warning('Configuration error: ' '[%s/%s] cgroup NOT available', kind, name) errors = True continue self._setup_group(tc, group) return not errors def _setup_group(self, tc, group): kind = group.controller.kind name = group.name # Configure each required attribute group.set(**tc['cgroups']['conf'][kind][name]) def _setup_files(self, tc): if 'files' not in tc: self._log.debug('\'files\' Configuration block not provided') return True for name, value in tc['files'].iteritems(): check = False if name.startswith('!/'): check = True name = name[1:] self._log.info('File Write(check=%s): \'%s\' -> \'%s\'', check, value, name) try: self.target.write_value(name, value, True) except TargetError: self._log.info('File Write Failed: \'%s\' -> \'%s\'', value, name) if check: raise return False def _target_configure(self, tc): self._print_header( 'configuring target for [{}] experiments'\ .format(tc['tag'])) self._setup_kernel(tc) self._setup_sched_features(tc) self._setup_cpufreq(tc) self._setup_files(tc) return self._setup_cgroups(tc) def _target_conf_flag(self, tc, flag): if 'flags' not in tc: has_flag = False else: has_flag = flag in tc['flags'] self._log.debug('Check if target configuration [%s] has flag [%s]: %s', tc['tag'], flag, has_flag) return has_flag def _target_cleanup(self, tc): if self._old_selinux_mode is not None: self._log.info('Restoring target SELinux mode: %s', self._old_selinux_mode) self.target.execute('setenforce ' + self._old_selinux_mode, as_root=True) ################################################################################ # Workload Setup and Execution ################################################################################ def _wload_cpus(self, wl_idx, wlspec): if not 'cpus' in wlspec['conf']: return None cpus = wlspec['conf']['cpus'] if type(cpus) == list: return cpus if type(cpus) == int: return [cpus] # SMP target (or not bL module loaded) if not hasattr(self.target, 'bl'): if 'first' in cpus: return [ self.target.list_online_cpus()[0] ] if 'last' in cpus: return [ self.target.list_online_cpus()[-1] ] return self.target.list_online_cpus() # big.LITTLE target if cpus.startswith('littles'): if 'first' in cpus: return [ self.target.bl.littles_online[0] ] if 'last' in cpus: return [ self.target.bl.littles_online[-1] ] return self.target.bl.littles_online if cpus.startswith('bigs'): if 'first' in cpus: return [ self.target.bl.bigs_online[0] ] if 'last' in cpus: return [ self.target.bl.bigs_online[-1] ] return self.target.bl.bigs_online raise ValueError('unsupported [{}] "cpus" value for [{}] ' 'workload specification' .format(cpus, wl_idx)) def _wload_task_idxs(self, wl_idx, tasks): if type(tasks) == int: return range(tasks) if tasks == 'cpus': return range(len(self.target.core_names)) if tasks == 'little': return range(len([t for t in self.target.core_names if t == self.target.little_core])) if tasks == 'big': return range(len([t for t in self.target.core_names if t == self.target.big_core])) raise ValueError('unsupported "tasks" value for [{}] RT-App ' 'workload specification' .format(wl_idx)) def _wload_rtapp(self, wl_idx, wlspec, cpus): conf = wlspec['conf'] self._log.debug('Configuring [%s] rt-app...', conf['class']) # Setup a default "empty" task name prefix if 'prefix' not in conf: conf['prefix'] = 'task_' # Setup a default loadref CPU loadref = None if 'loadref' in wlspec: loadref = wlspec['loadref'] if conf['class'] == 'profile': params = {} # Load each task specification for task_name, task in conf['params'].items(): if task['kind'] not in wlgen.__dict__: self._log.error('RTA task of kind [%s] not supported', task['kind']) raise ValueError('unsupported "kind" value for task [{}] ' 'in RT-App workload specification' .format(task)) task_ctor = getattr(wlgen, task['kind']) num_tasks = task.get('tasks', 1) task_idxs = self._wload_task_idxs(wl_idx, num_tasks) for idx in task_idxs: idx_name = "_{}".format(idx) if len(task_idxs) > 1 else "" task_name_idx = conf['prefix'] + task_name + idx_name params[task_name_idx] = task_ctor(**task['params']).get() rtapp = wlgen.RTA(self.target, wl_idx, calibration = self.te.calibration()) rtapp.conf(kind='profile', params=params, loadref=loadref, cpus=cpus, run_dir=self.te.run_dir, duration=conf.get('duration')) return rtapp if conf['class'] == 'periodic': task_idxs = self._wload_task_idxs(wl_idx, conf['tasks']) params = {} for idx in task_idxs: task = conf['prefix'] + str(idx) params[task] = wlgen.Periodic(**conf['params']).get() rtapp = wlgen.RTA(self.target, wl_idx, calibration = self.te.calibration()) rtapp.conf(kind='profile', params=params, loadref=loadref, cpus=cpus, run_dir=self.te.run_dir, duration=conf.get('duration')) return rtapp if conf['class'] == 'custom': rtapp = wlgen.RTA(self.target, wl_idx, calibration = self.te.calibration()) rtapp.conf(kind='custom', params=conf['json'], duration=conf.get('duration'), loadref=loadref, cpus=cpus, run_dir=self.te.run_dir) return rtapp raise ValueError('unsupported \'class\' value for [{}] ' 'RT-App workload specification' .format(wl_idx)) def _wload_perf_bench(self, wl_idx, wlspec, cpus): conf = wlspec['conf'] self._log.debug('Configuring perf_message...') if conf['class'] == 'messaging': perf_bench = wlgen.PerfMessaging(self.target, wl_idx) perf_bench.conf(**conf['params']) return perf_bench if conf['class'] == 'pipe': perf_bench = wlgen.PerfPipe(self.target, wl_idx) perf_bench.conf(**conf['params']) return perf_bench raise ValueError('unsupported "class" value for [{}] ' 'perf bench workload specification' .format(wl_idx)) def _wload_conf(self, wl_idx, wlspec): # CPUS: setup execution on CPUs if required by configuration cpus = self._wload_cpus(wl_idx, wlspec) # CGroup: setup CGroups if requried by configuration self._cgroup = self._default_cgroup if 'cgroup' in wlspec: if 'cgroups' not in self.target.modules: raise RuntimeError('Target not supporting CGroups or CGroups ' 'not configured for the current test configuration') self._cgroup = wlspec['cgroup'] if wlspec['type'] == 'rt-app': return self._wload_rtapp(wl_idx, wlspec, cpus) if wlspec['type'] == 'perf_bench': return self._wload_perf_bench(wl_idx, wlspec, cpus) raise ValueError('unsupported "type" value for [{}] ' 'workload specification' .format(wl_idx)) def _wload_init(self, tc, wl_idx): tc_idx = tc['tag'] # Configure the test workload wlspec = self._experiments_conf['wloads'][wl_idx] wload = self._wload_conf(wl_idx, wlspec) # Keep track of platform configuration test_dir = '{}/{}:{}:{}'\ .format(self.te.res_dir, wload.wtype, tc_idx, wl_idx) os.makedirs(test_dir) self.te.platform_dump(test_dir) # Keep track of kernel configuration and version config = self.target.config with gzip.open(os.path.join(test_dir, 'kernel.config'), 'wb') as fh: fh.write(config.text) output = self.target.execute('{} uname -a'\ .format(self.target.busybox)) with open(os.path.join(test_dir, 'kernel.version'), 'w') as fh: fh.write(output) return wload, test_dir def _wload_run(self, exp_idx, experiment): tc = experiment.conf wload = experiment.wload tc_idx = tc['tag'] self._print_title('Experiment {}/{}, [{}:{}] {}/{}'\ .format(exp_idx, self._exp_count, tc_idx, experiment.wload_name, experiment.iteration, self.iterations)) # Setup local results folder self._log.debug('out_dir set to [%s]', experiment.out_dir) os.system('mkdir -p ' + experiment.out_dir) # Freeze all userspace tasks that we don't need for running tests need_thaw = False if self._target_conf_flag(tc, 'freeze_userspace'): need_thaw = self._freeze_userspace() # FTRACE: start (if a configuration has been provided) if self.te.ftrace and self._target_conf_flag(tc, 'ftrace'): self._log.warning('FTrace events collection enabled') self.te.ftrace.start() # ENERGY: start sampling if self.te.emeter: self.te.emeter.reset() # WORKLOAD: Run the configured workload wload.run(out_dir=experiment.out_dir, cgroup=self._cgroup) # ENERGY: collect measurements if self.te.emeter: self.te.emeter.report(experiment.out_dir) # FTRACE: stop and collect measurements if self.te.ftrace and self._target_conf_flag(tc, 'ftrace'): self.te.ftrace.stop() trace_file = experiment.out_dir + '/trace.dat' self.te.ftrace.get_trace(trace_file) self._log.info('Collected FTrace binary trace:') self._log.info(' %s', trace_file.replace(self.te.res_dir, '<res_dir>')) stats_file = experiment.out_dir + '/trace_stat.json' self.te.ftrace.get_stats(stats_file) self._log.info('Collected FTrace function profiling:') self._log.info(' %s', stats_file.replace(self.te.res_dir, '<res_dir>')) # Unfreeze the tasks we froze if need_thaw: self._thaw_userspace() self._print_footer() def _freeze_userspace(self): if 'cgroups' not in self.target.modules: raise RuntimeError( 'Failed to freeze userspace. Ensure "cgroups" module is listed ' 'among modules in target/test configuration') controllers = [s.name for s in self.target.cgroups.list_subsystems()] if 'freezer' not in controllers: self._log.warning('No freezer cgroup controller on target. ' 'Not freezing userspace') return False exclude = self.critical_tasks[self.te.target.os] self._log.info('Freezing all tasks except: %s', ','.join(exclude)) self.te.target.cgroups.freeze(exclude) return True def _thaw_userspace(self): self._log.info('Un-freezing userspace tasks') self.te.target.cgroups.freeze(thaw=True) ################################################################################ # Utility Functions ################################################################################ def _print_section(self, message): self._log.info('') self._log.info(FMT_SECTION) self._log.info(message) self._log.info(FMT_SECTION) def _print_header(self, message): self._log.info('') self._log.info(FMT_HEADER) self._log.info(message) def _print_title(self, message): self._log.info(FMT_TITLE) self._log.info(message) def _print_footer(self, message=None): if message: self._log.info(message) self._log.info(FMT_FOOTER) ################################################################################ # Globals ################################################################################ # Regular expression for comments JSON_COMMENTS_RE = re.compile( '(^)?[^\S\n]*/(?:\*(.*?)\*/[^\S\n]*|/[^\n]*)($)?', re.DOTALL | re.MULTILINE ) # Target specific paths TGT_RUN_DIR = 'run_dir' # Logging formatters FMT_SECTION = r'{:#<80}'.format('') FMT_HEADER = r'{:=<80}'.format('') FMT_TITLE = r'{:~<80}'.format('') FMT_FOOTER = r'{:-<80}'.format('') # vim :set tabstop=4 shiftwidth=4 expandtab