普通文本  |  580行  |  24.7 KB

# Copyright 2018 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Helper class for power measurement with telemetry devices."""

import collections
import datetime
from distutils import sysconfig
import json
import logging
import numpy
import os
import re
import string
import threading
import time

import powerlog

from servo import measure_power

from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.power import power_status
from autotest_lib.client.cros.power import power_telemetry_utils
from autotest_lib.server.cros.power import power_dashboard


def ts_processing(ts_str):
    """Parse autotest log timestamp into local time seconds since epoch.

    @param ts_str: a timestamp string from client.DEBUG file in local time zone.
    @return seconds since epoch, inserting the current year because ts_str does
            not include year. This introduces error if client side test is
            running across the turn of the year.
    """
    ts = datetime.datetime.strptime(ts_str, '%m/%d %H:%M:%S.%f ')
    # TODO(mqg): fix the wrong year at turn of the year.
    ts = ts.replace(year=datetime.datetime.today().year)
    return time.mktime(ts.timetuple()) + ts.microsecond / 1e6


class PowerTelemetryLogger(object):
    """A helper class for power autotests requiring telemetry devices.

    Telemetry: external pieces of hardware which help with measuring power
    data on the Chrome device. This is not to be confused with library
    telemetry.core, which is a required library / dependency for autotests
    involving Chrome and / or ARC. Examples of power telemetry devices include
    Servo and Sweetberry.

    This logger class detects telemetry devices connected to the DUT. It will
    then start and stop the measurement, trim the excessive power telemetry
    device data and report the data back to the workstation and the dashboard
    """

    DASHBOARD_UPLOAD_URL = 'http://chrome-power.appspot.com'
    DEFAULT_START = r'starting test\(run_once\(\)\), test details follow'
    DEFAULT_END = r'The test has completed successfully'

    def __init__(self, config, resultsdir, host):
        """Init PowerTelemetryLogger.

        @param config: the args argument from test_that in a dict. Settings for
                       power telemetry devices.
                       required data: {'test': 'test_TestName.tag'}
        @param resultsdir: path to directory where current autotest results are
                           stored, e.g. /tmp/test_that_results/
                           results-1-test_TestName.tag/test_TestName.tag/
                           results/
        @param host: CrosHost object representing the DUT.
        """
        logging.debug('%s initialize.', self.__class__.__name__)
        self._resultsdir = resultsdir
        self._host = host
        self._tagged_testname = config['test']
        self._pdash_note = config.get('pdash_note', '')

    def start_measurement(self):
        """Start power telemetry devices."""
        self._start_measurement()
        logging.info('%s starts.', self.__class__.__name__)
        self._start_ts = time.time()

    def _start_measurement(self):
        """Start power telemetry devices."""
        raise NotImplementedError('Subclasses must implement '
                '_start_measurement.')

    def end_measurement(self, client_test_dir):
        """End power telemetry devices.

        End power measurement with telemetry devices, get the power telemetry
        device data, trim the data recorded outside of actual testing, and
        upload statistics to dashboard.

        @param client_test_dir: directory of the client side test.
        """
        self._end_measurement()
        logging.info('%s finishes.', self.__class__.__name__)
        start_ts, end_ts = self._get_client_test_ts(client_test_dir)
        loggers = self._load_and_trim_data(start_ts, end_ts)
        checkpoint_logger = self._get_client_test_checkpoint_logger(
                client_test_dir)
        self._upload_data(loggers, checkpoint_logger)

    def _end_measurement(self):
        """End power telemetry devices."""
        raise NotImplementedError('Subclasses must implement _end_measurement.')

    def _get_client_test_ts(self, client_test_dir):
        """Determine the start and end timestamp for the telemetry data.

        Power telemetry devices will run through the entire autotest, including
        the overhead time, but we only need the measurements of actual testing,
        so parse logs from client side test to determine the start and end
        timestamp for the telemetry data.

        @param client_test_dir: directory of the client side test.
        @return (start_ts, end_ts)
                start_ts: the start timestamp of the client side test in seconds
                          since epoch or None.
                end_ts: the end timestamp of the client side test in seconds
                        since epoch or None.
        """
        if not os.path.isdir(client_test_dir):
            logging.error('Cannot find client side test dir %s, no need to '
                          'trim power telemetry measurements.', client_test_dir)
            return (None, None)

        # Use timestamp in client side test power_log.json as start & end
        # timestamp.
        power_log_path = os.path.join(client_test_dir, 'results',
                                      'power_log.json')
        start_ts, end_ts = self._get_power_log_ts(power_log_path)
        if start_ts and end_ts:
            self._start_ts = start_ts
            return (start_ts, end_ts)

        # Parse timestamp in client side test debug log and use as start & end
        # timestamp.
        client_test_name = os.path.basename(client_test_dir)
        debug_file_path = os.path.join(client_test_dir, 'debug',
                                       '%s.DEBUG' % client_test_name)
        start_ts, end_ts = self._get_debug_log_ts(debug_file_path)
        if start_ts:
            self._start_ts = start_ts
        return (start_ts, end_ts)

    def _get_debug_log_ts(self, debug_file_path):
        """Parse client side test start and end timestamp from debug log.

        @param debug_file_path: path to client side test debug log.
        @return (start_ts, end_ts)
                start_ts: the start timestamp of the client side test in seconds
                          since epoch or None.
                end_ts: the end timestamp of the client side test in seconds
                        since epoch or None.
        """
        default_test_events = collections.defaultdict(dict)
        custom_test_events = collections.defaultdict(dict)
        default_test_events['start']['str'] = self.DEFAULT_START
        default_test_events['end']['str'] = self.DEFAULT_END
        custom_test_events['start']['str'] = power_telemetry_utils.CUSTOM_START
        custom_test_events['end']['str'] = power_telemetry_utils.CUSTOM_END
        for event in default_test_events:
            default_test_events[event]['re'] = re.compile(r'([\d\s\./:]+).+' +
                    default_test_events[event]['str'])
            default_test_events[event]['match'] = False
        for event in custom_test_events:
            custom_test_events[event]['re'] = re.compile(r'.*' +
                    custom_test_events[event]['str'] + r'\s+([\d\.]+)')
        events_ts = {
            'start': None,
            'end': None,
        }

        try:
            with open(debug_file_path, 'r') as debug_log:

                for line in debug_log:
                    for event in default_test_events:
                        match = default_test_events[event]['re'].match(line)
                        if match:
                            default_test_events[event]['ts'] = \
                                    ts_processing(match.group(1))
                            default_test_events[event]['match'] = True
                    for event in custom_test_events:
                        match = custom_test_events[event]['re'].match(line)
                        if match:
                            custom_test_events[event]['ts'] = \
                                    float(match.group(1))

            for event in default_test_events:
                if not default_test_events[event]['match']:
                    raise error.TestWarn('Cannot find %s timestamp in client '
                                         'side test debug log.')

            for event in events_ts:
                events_ts[event] = default_test_events[event].get(
                        'ts', events_ts[event])
                events_ts[event] = custom_test_events[event].get(
                        'ts', events_ts[event])

            return (events_ts['start'], events_ts['end'])

        except Exception as exc:
            logging.warning('Client side test debug log %s does not contain '
                            'valid start and end timestamp, see exception: %s',
                            debug_file_path, exc)
            return (None, None)

    def _get_power_log_ts(self, power_log_path):
        """Parse client side test start and end timestamp from power_log.json.

        @param power_log_path: path to client side test power_log.json.
        @return (start_ts, end_ts)
                start_ts: the start timestamp of the client side test in seconds
                          since epoch or None.
                end_ts: the end timestamp of the client side test in seconds
                        since epoch or None.
        """
        try:
            with open(power_log_path, 'r') as power_log:
                power_log_str = power_log.read()
                json_decoder = json.JSONDecoder()
                power_log_obj = []

                idx = 0
                start_ts = list()
                end_ts = list()
                while idx < len(power_log_str):
                    log_entry, idx = json_decoder.raw_decode(power_log_str, idx)
                    start_ts.append(log_entry['timestamp'])
                    end_ts.append(log_entry['timestamp'] +
                                  log_entry['power']['sample_duration'] *
                                  log_entry['power']['sample_count'])

                return (min(start_ts), max(end_ts))
        except Exception as exc:
            logging.warning('Client side test power_log %s does not contain '
                            'valid start and end timestamp, see exception: %s',
                            power_log_path, exc)
            return (None, None)

    def _load_and_trim_data(self, start_ts, end_ts):
        """Load data and trim data.

        Load and format data recorded by power telemetry devices. Trim data if
        necessary.

        @param start_ts: start timestamp in seconds since epoch, None if no
                         need to trim data.
        @param end_ts: end timestamp in seconds since epoch, None if no need to
                       trim data.
        @return a list of loggers, where each logger contains raw power data and
                statistics.

        logger format:
        {
            'sample_count' : 60,
            'sample_duration' : 60,
            'data' : {
                'domain_1' : [ 111.11, 123.45 , ... , 99.99 ],
                ...
                'domain_n' : [ 3999.99, 4242.42, ... , 4567.89 ]
            },
            'average' : {
                'domain_1' : 100.00,
                ...
                'domain_n' : 4300.00
            },
            'unit' : {
                'domain_1' : 'milliwatt',
                ...
                'domain_n' : 'milliwatt'
            },
            'type' : {
                'domain_1' : 'servod',
                ...
                'domain_n' : 'servod'
            },
        }
        """
        raise NotImplementedError('Subclasses must implement '
                '_load_and_trim_data and return a list of loggers.')

    def _get_client_test_checkpoint_logger(self, client_test_dir):
        client_test_resultsdir = os.path.join(client_test_dir, 'results')
        checkpoint_logger = power_status.get_checkpoint_logger_from_file(
                resultsdir=client_test_resultsdir)
        return checkpoint_logger

    def _upload_data(self, loggers, checkpoint_logger):
        """Upload the data to dashboard.

        @param loggers: a list of loggers, where each logger contains raw power
                        data and statistics.
        """

        for logger in loggers:
            pdash = power_dashboard.PowerTelemetryLoggerDashboard(
                    logger=logger, testname=self._tagged_testname,
                    host=self._host, start_ts=self._start_ts,
                    checkpoint_logger=checkpoint_logger,
                    resultsdir=self._resultsdir,
                    uploadurl=self.DASHBOARD_UPLOAD_URL, note=self._pdash_note)
            pdash.upload()


class ServodTelemetryLogger(PowerTelemetryLogger):
    """This logger class measures power by querying a servod instance.
    """

    DEFAULT_INA_RATE = 20.0
    DEFAULT_VBAT_RATE = 60.0

    def __init__(self, config, resultsdir, host):
        """Init ServodTelemetryLogger.

        @param config: the args argument from test_that in a dict. Settings for
                       power telemetry devices.
                       required data:
                       {'test': 'test_TestName.tag',
                        'servo_host': host of servod instance,
                        'servo_port: port that the servod instance is on}
        @param resultsdir: path to directory where current autotest results are
                           stored, e.g. /tmp/test_that_results/
                           results-1-test_TestName.tag/test_TestName.tag/
                           results/
        @param host: CrosHost object representing the DUT.
        """
        super(ServodTelemetryLogger, self).__init__(config, resultsdir, host)

        self._servo_host = config['servo_host']
        self._servo_port = config['servo_port']
        self._ina_rate = float(config.get('ina_rate', self.DEFAULT_INA_RATE))
        self._vbat_rate = float(config.get('vbat_rate', self.DEFAULT_VBAT_RATE))
        self._pm = measure_power.PowerMeasurement(host=self._servo_host,
                                                  port=self._servo_port,
                                                  ina_rate=self._ina_rate,
                                                  vbat_rate=self._vbat_rate)

    def _start_measurement(self):
        """Start power measurement by querying servod."""
        setup_done = self._pm.MeasurePower()
        # Block the main thread until setup is done and measurement has started.
        setup_done.wait()

    def _end_measurement(self):
        """End querying servod."""
        self._pm.FinishMeasurement()

    def _load_and_trim_data(self, start_ts, end_ts):
        """Load data and trim data.

        Load and format data recorded by servod. Trim data if necessary.
        """
        self._pm.ProcessMeasurement(start_ts, end_ts)

        summary = self._pm.GetSummary()
        raw_data = self._pm.GetRawData()

        sample_duration = {'Onboard INA': self._ina_rate, 'EC': self._vbat_rate}
        loggers = list()

        for source in summary:
            data = {k: v for k, v in raw_data[source].iteritems()
                    if k not in ['Sample_msecs', 'time', 'timeline']}
            ave = {k: v['mean'] for k, v in summary[source].iteritems()
                   if k not in ['Sample_msecs', 'time', 'timeline']}

            logger = {
                # All data domains should have same sample count.
                'sample_count': summary[source]['time']['count'],
                'sample_duration': sample_duration[source],
                'data': data,
                'average': ave,
                # TODO(mqg): hard code the units for now because we are only
                # dealing with power so far. When we start to work with voltage
                # or current, read the units from the .json files.
                'unit': {k: 'milliwatt' for k in data},
                'type': {k: 'servod' for k in data},
            }

            loggers.append(logger)

        return loggers

    def end_measurement(self, client_test_dir):
      """In addition to the common end_measurement flow dump summaries.

      @param client_test_dir: directory of the client side test.
      """
      # Run the shared end_measurement logic first.
      super(ServodTelemetryLogger, self).end_measurement(client_test_dir)
      # At this point the PowerMeasurement unit has been processed. Dump its
      # formatted summaries into the results directory.
      power_summaries_dir = os.path.join(self._resultsdir, 'power_summaries')
      if not os.path.exists(power_summaries_dir):
        os.makedirs(power_summaries_dir)
      self._pm.SaveSummary(outdir=power_summaries_dir)

class PowerlogTelemetryLogger(PowerTelemetryLogger):
    """This logger class measures power with Sweetberry via powerlog tool.
    """

    DEFAULT_SWEETBERRY_INTERVAL = 20.0
    SWEETBERRY_CONFIG_DIR = os.path.join(
            sysconfig.get_python_lib(standard_lib=False), 'servo', 'data')

    def __init__(self, config, resultsdir, host):
        """Init PowerlogTelemetryLogger.

        @param config: the args argument from test_that in a dict. Settings for
                       power telemetry devices.
                       required data: {'test': 'test_TestName.tag'}
        @param resultsdir: path to directory where current autotest results are
                           stored, e.g. /tmp/test_that_results/
                           results-1-test_TestName.tag/test_TestName.tag/
                           results/
        @param host: CrosHost object representing the DUT.
        """
        super(PowerlogTelemetryLogger, self).__init__(config, resultsdir, host)

        self._interval = float(config.get('sweetberry_interval',
                                          self.DEFAULT_SWEETBERRY_INTERVAL))
        self._logdir = os.path.join(resultsdir, 'sweetberry_log')
        self._end_flag = threading.Event()
        self._sweetberry_serial = config.get('sweetberry_serial', None)
        if 'sweetberry_config' in config:
            self._sweetberry_config = config['sweetberry_config']
        else:
            board = self._host.get_board().replace('board:', '')
            hardware_rev = self._host.get_hardware_revision()
            self._sweetberry_config = board + '_' + hardware_rev
        board_path, scenario_path = \
                self._get_sweetberry_config_path(self._sweetberry_config)
        self._sweetberry_thread = SweetberryThread(
                board=board_path,
                scenario=scenario_path,
                interval=self._interval,
                stats_json_dir=self._logdir,
                end_flag=self._end_flag,
                serial=self._sweetberry_serial)
        self._sweetberry_thread.setDaemon(True)

    def _start_measurement(self):
        """Start power measurement with Sweetberry via powerlog tool."""
        self._sweetberry_thread.start()

    def _end_measurement(self):
        """End querying Sweetberry."""
        self._end_flag.set()
        # Sweetberry thread should theoretically finish within 1 self._interval
        # but giving 2 here to be more lenient.
        self._sweetberry_thread.join(self._interval * 2)
        if self._sweetberry_thread.is_alive():
            logging.warning('%s %s thread did not finish. There might be extra '
                            'data at the end.', self.__class__.__name__,
                            self._sweetberry_thread.name)

    def _load_and_trim_data(self, start_ts, end_ts):
        """Load data and trim data.

        Load and format data recorded by powerlog tool. Trim data if necessary.
        """
        if not os.path.exists(self._logdir):
            logging.error('Cannot find %s, no Sweetberry measurements exist, '
                          'not uploading to dashboard.', self._logdir)
            return

        trimmed_log_dirs = list()
        # Adding a padding to both start and end timestamp because the timestamp
        # of each data point is taken at the end of its corresponding interval.
        start_ts = start_ts + self._interval / 2 if start_ts else 0
        end_ts = end_ts + self._interval / 2 if end_ts else time.time()
        for dirname in os.listdir(self._logdir):
            if dirname.startswith('sweetberry'):
                sweetberry_ts = float(string.lstrip(dirname, 'sweetberry'))
                if start_ts <= sweetberry_ts <= end_ts:
                    trimmed_log_dirs.append(dirname)

        data = collections.defaultdict(list)
        for sweetberry_file in sorted(trimmed_log_dirs):
            fname = os.path.join(self._logdir, sweetberry_file, 'summary.json')
            with open(fname, 'r') as f:
                d = json.load(f)
                for k, v in d.iteritems():
                    data[k].append(v['mean'])

        logger = {
            # All data domains should have same sample count.
            'sample_count': len(data.itervalues().next()),
            'sample_duration': self._interval,
            'data': data,
            'average': {k: numpy.average(v) for k, v in data.iteritems()},
            # TODO(mqg): hard code the units for now because we are only dealing
            # with power so far. When we start to work with voltage or current,
            # read the units from the .json files.
            'unit': {k: 'milliwatt' for k in data},
            'type': {k: 'sweetberry' for k in data},
        }

        return [logger]

    def _get_sweetberry_config_path(self, filename):
        """Get the absolute path for Sweetberry board and scenario file.

        @param filename: string of Sweetberry config filename.
        @return a tuple of the path to Sweetberry board file and the path to
                Sweetberry scenario file.
        @raises error.TestError if board file or scenario file does not exist in
                file system.
        """
        board_path = os.path.join(self.SWEETBERRY_CONFIG_DIR,
                                  '%s.board' % filename)
        if not os.path.isfile(board_path):
            msg = 'Sweetberry board file %s does not exist.' % board_path
            raise error.TestError(msg)

        scenario_path = os.path.join(self.SWEETBERRY_CONFIG_DIR,
                                     '%s.scenario' % filename)
        if not os.path.isfile(scenario_path):
            msg = 'Sweetberry scenario file %s does not exist.' % scenario_path
            raise error.TestError(msg)
        return (board_path, scenario_path)


class SweetberryThread(threading.Thread):
    """A thread that starts and ends Sweetberry measurement."""

    def __init__(self, board, scenario, interval, stats_json_dir, end_flag,
                 serial=None):
        """Initialize the Sweetberry thread.

        Once started, this thread will invoke Sweetberry powerlog tool every
        [interval] seconds, which will sample each rail in [scenario] file
        multiple times and write the average of those samples in json format to
        [stats_json_dir]. The resistor size of each power rail is specified in
        [board] file.

        See go/sweetberry and go/sweetberry-readme for more details.

        @param board: file name for Sweetberry board file.
        @param scenario: file name for Sweetberry scenario file.
        @param interval: time of each Sweetberry run cycle; print Sweetberry
                         data every <interval> seconds.
        @param stats_json_dir: directory to store Sweetberry stats in json.
        @param end_flag: event object, stop Sweetberry measurement when this is
                         set.
        @param serial: serial number of sweetberry.
        """
        threading.Thread.__init__(self, name='Sweetberry')
        self._end_flag = end_flag
        self._interval = interval
        self._argv = ['--board', board,
                      '--config', scenario,
                      '--save_stats_json', stats_json_dir,
                      '--no_print_raw_data',
                      '--mW']
        if serial:
            self._argv.extend(['--serial', serial])

    def run(self):
        """Start Sweetberry measurement until end_flag is set."""
        logging.debug('Sweetberry starts.')
        loop = 0
        start_timestamp = time.time()
        while not self._end_flag.is_set():
            # TODO(mqg): in the future use more of powerlog components
            # explicitly, make a long call and harvest data from Sweetberry,
            # instead of using it like a command line tool now.
            loop += 1
            next_loop_start_timestamp = start_timestamp + loop * self._interval
            current_timestamp = time.time()
            this_loop_duration = next_loop_start_timestamp - current_timestamp
            powerlog.main(self._argv + ['--seconds', str(this_loop_duration)])
        logging.debug('Sweetberry stops.')