普通文本  |  216行  |  9.21 KB

# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging

from autotest_lib.client.common_lib import error
from autotest_lib.server.cros.network import attenuator
from autotest_lib.server.cros.network import attenuator_hosts

from chromite.lib import timeout_util

HOST_TO_FIXED_ATTENUATIONS = attenuator_hosts.HOST_FIXED_ATTENUATIONS


class AttenuatorController(object):
    """Represents a minicircuits variable attenuator.

    This device is used to vary the attenuation between a router and a client.
    This allows us to measure throughput as a function of signal strength and
    test some roaming situations.  The throughput vs signal strength tests
    are referred to rate vs range (RvR) tests in places.

    """

    @property
    def supported_attenuators(self):
        """@return iterable of int attenuators supported on this host."""
        return self._fixed_attenuations.keys()


    def __init__(self, hostname):
        """Construct a AttenuatorController.

        @param hostname: Hostname representing minicircuits attenuator.

        """
        self.hostname = hostname
        super(AttenuatorController, self).__init__()
        if hostname not in HOST_TO_FIXED_ATTENUATIONS.keys():
            raise error.TestError('Unexpected RvR host name %r.' % hostname)
        self._fixed_attenuations = HOST_TO_FIXED_ATTENUATIONS[hostname]
        num_atten = len(self.supported_attenuators)

        self._attenuator = attenuator.Attenuator(hostname, num_atten)
        self.set_variable_attenuation(0)


    def _approximate_frequency(self, attenuator_num, freq):
        """Finds an approximate frequency to freq.

        In case freq is not present in self._fixed_attenuations, we use a value
        from a nearby channel as an approximation.

        @param attenuator_num: attenuator in question on the remote host.  Each
                attenuator has a different fixed path loss per frequency.
        @param freq: int frequency in MHz.
        @returns int approximate frequency from self._fixed_attenuations.

        """
        old_offset = None
        approx_freq = None
        for defined_freq in self._fixed_attenuations[attenuator_num].keys():
            new_offset = abs(defined_freq - freq)
            if old_offset is None or new_offset < old_offset:
                old_offset = new_offset
                approx_freq = defined_freq

        logging.debug('Approximating attenuation for frequency %d with '
                      'constants for frequency %d.', freq, approx_freq)
        return approx_freq


    def close(self):
        """Close variable attenuator connection."""
        self._attenuator.close()


    def set_total_attenuation(self, atten_db, frequency_mhz,
                              attenuator_num=None):
        """Set the total attenuation on one or all attenuators.

        @param atten_db: int level of attenuation in dB.  This must be
                higher than the fixed attenuation level of the affected
                attenuators.
        @param frequency_mhz: int frequency for which to calculate the
                total attenuation.  The fixed component of attenuation
                varies with frequency.
        @param attenuator_num: int attenuator to change, or None to
                set all variable attenuators.

        """
        affected_attenuators = self.supported_attenuators
        if attenuator_num is not None:
            affected_attenuators = [attenuator_num]
        for atten in affected_attenuators:
            freq_to_fixed_loss = self._fixed_attenuations[atten]
            approx_freq = self._approximate_frequency(atten,
                                                      frequency_mhz)
            variable_atten_db = atten_db - freq_to_fixed_loss[approx_freq]
            self.set_variable_attenuation(variable_atten_db,
                                          attenuator_num=atten)


    def set_variable_attenuation(self, atten_db, attenuator_num=None):
        """Set the variable attenuation on one or all attenuators.

        @param atten_db: int non-negative level of attenuation in dB.
        @param attenuator_num: int attenuator to change, or None to
                set all variable attenuators.

        """
        affected_attenuators = self.supported_attenuators
        if attenuator_num is not None:
            affected_attenuators = [attenuator_num]
        for atten in affected_attenuators:
            try:
                self._attenuator.set_atten(atten, atten_db)
                if int(self._attenuator.get_atten(atten)) != atten_db:
                    raise error.TestError('Attenuation did not set as expected '
                                          'on attenuator %d' % atten)
            except error.TestError:
                self._attenuator.reopen(self.hostname)
                self._attenuator.set_atten(atten, atten_db)
                if int(self._attenuator.get_atten(atten)) != atten_db:
                    raise error.TestError('Attenuation did not set as expected '
                                          'on attenuator %d' % atten)
            logging.info('%ddb attenuation set successfully on attenautor %d',
                         atten_db, atten)


    def get_minimal_total_attenuation(self):
        """Get attenuator's maximum fixed attenuation value.

        This is pulled from the current attenuator's lines and becomes the
        minimal total attenuation when stepping through attenuation levels.

        @return maximum starting attenuation value

        """
        max_atten = 0
        for atten_num in self._fixed_attenuations.iterkeys():
            atten_values = self._fixed_attenuations[atten_num].values()
            max_atten = max(max(atten_values), max_atten)
        return max_atten


    def set_signal_level(self, client_context, requested_sig_level,
            min_sig_level_allowed=-85, tolerance_percent=3, timeout=240):
        """Set wifi signal to desired level by changing attenuation.

        @param client_context: Client context object.
        @param requested_sig_level: Negative int value in dBm for wifi signal
                level to be set.
        @param min_sig_level_allowed: Minimum signal level allowed; this is to
                ensure that we don't set a signal that is too weak and DUT can
                not associate.
        @param tolerance_percent: Percentage to be used to calculate the desired
                range for the wifi signal level.
        """
        atten_db = 0
        starting_sig_level = client_context.wifi_signal_level
        if not starting_sig_level:
            raise error.TestError("No signal detected.")
        if not (min_sig_level_allowed <= requested_sig_level <=
                starting_sig_level):
            raise error.TestError("Requested signal level (%d) is either "
                                  "higher than current signal level (%r) with "
                                  "0db attenuation or lower than minimum "
                                  "signal level (%d) allowed." %
                                  (requested_sig_level,
                                  starting_sig_level,
                                  min_sig_level_allowed))

        try:
            with timeout_util.Timeout(timeout):
                while True:
                    client_context.reassociate(timeout_seconds=1)
                    current_sig_level = client_context.wifi_signal_level
                    logging.info("Current signal level %r", current_sig_level)
                    if not current_sig_level:
                        raise error.TestError("No signal detected.")
                    if self.signal_in_range(requested_sig_level,
                            current_sig_level, tolerance_percent):
                        logging.info("Signal level set to %r.",
                                     current_sig_level)
                        break
                    if current_sig_level > requested_sig_level:
                        self.set_variable_attenuation(atten_db)
                        atten_db +=1
                    if current_sig_level < requested_sig_level:
                        self.set_variable_attenuation(atten_db)
                        atten_db -= 1
        except (timeout_util.TimeoutError, error.TestError,
                error.TestFail) as e:
            raise error.TestError("Not able to set wifi signal to requested "
                                  "level. \n%s" % e)


    def signal_in_range(self, req_sig_level, curr_sig_level, tolerance_percent):
        """Check if wifi signal is within the threshold of requested signal.

        @param req_sig_level: Negative int value in dBm for wifi signal
                level to be set.
        @param curr_sig_level: Current wifi signal level seen by the DUT.
        @param tolerance_percent: Percentage to be used to calculate the desired
                range for the wifi signal level.

        @returns True if wifi signal is in the desired range.
        """
        min_sig = req_sig_level + (req_sig_level * tolerance_percent / 100)
        max_sig = req_sig_level - (req_sig_level * tolerance_percent / 100)
        if min_sig <= curr_sig_level <= max_sig:
            return True
        return False