普通文本  |  345行  |  12.06 KB

# Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Interface to control RF Switch.

Helper module to control RF Switch. Common commands to control the relays
are made available as methods that can be called.

Please refer go/rf-switch for more info on the switch.
"""
import contextlib
import logging

from autotest_lib.server.cros.network.rf_switch import scpi


class RfSwitchException(Exception):
    """Exception for RfSwitch Errors."""


class RfSwitch(scpi.Scpi):
    """RF Switch Controller."""

    _ALL_AP_RELAYS = 'k1_1:k1_24'  # Each AP uses 6 relays
    _ALL_CLIENT_RELAYS = 'k1_25:k1_48'  # Each client uses 6 relays
    _MIN_ENCLOSURE = 1  # Starting enclosure number
    _MAX_ENCLOSURE = 4  # Last enclosure number
    _ALL_ENCLOSURE = 0  # All the enclosures
    _RELAYS_PER_ENCLOSURE = 6  # Number of relays in an enclosure

    # Each AP enclosure uses 6 relays and a R relay to set attenuation
    _AP_ATTENUATOR_RELAYS = {
        1: ['k1_49', 'k1_50', 'k1_51', 'k1_52', 'k1_53', 'k1_54', 'R1_9'],
        2: ['k1_55', 'k1_56', 'k1_57', 'k1_58', 'k1_59', 'k1_60', 'R1_10'],
        3: ['k1_61', 'k1_62', 'k1_63', 'k1_64', 'k1_65', 'k1_66', 'R1_11'],
        4: ['k1_67', 'k1_68', 'k1_69', 'k1_70', 'k1_71', 'k1_72', 'R1_12'],
    }
    # Shorter version
    _AP_ATTENUATOR_RELAYS_SHORT = {
        1: 'k1_49:k1_54,R1_9',
        2: 'k1_55:k1_60,R1_10',
        3: 'k1_61:k1_66,R1_11',
        4: 'k1_67:k1_72,R1_12',
    }

    _CMD_CLOSE_RELAYS = 'ROUT:CLOS'
    _CMD_CHECK_RELAYS_CLOSED = 'ROUT:CLOS?'
    _CMD_OPEN_RELAYS = 'ROUT:OPEN'
    _CMD_OPEN_ALL_RELAYS = 'ROUT:OPEN:ALL'
    _CMD_SET_VERIFY = 'ROUT:CHAN:VER'
    _CMD_GET_VERIFY = 'ROUT:CHAN:VER?'
    _CMD_SET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL'
    _CMD_GET_VERIFY_INVERTED = 'ROUT:CHAN:VER:POL?'
    _CMD_GET_VERIFY_STATE = 'ROUT:CHAN:VER:POS:STAT?'
    _CMD_CHECK_BUSY = 'ROUT:MOD:BUSY?'
    _CMD_WAIT = 'ROUT:MOD:WAIT'

    def __init__(self, host, port=scpi.Scpi.SCPI_PORT):
        """Controller for RF Switch.

        @param host: Hostname or IP address of RF Switch
        @param port: Int SCPI port number (default 5025)

        """
        scpi.Scpi.__init__(self, host, port)

    def send_cmd_check_error(self, cmd):
        """Send command to switch and check for any error.

        @param cmd: string cmd to send to switch

        """
        self.write(cmd)
        code, error = self.error_query()
        if code:
            raise RfSwitchException('Error on command: "%s" - code: %s,'
                                    ' message: %s', cmd, code, error)
        self.wait()

    def close_relays(self, relays):
        """Close relays.

        @param relays: relays to close (, to separate and : for range)

        """
        self.send_cmd_check_error('%s (@%s)\n'
                                  % (self._CMD_CLOSE_RELAYS, relays))

    def relays_closed(self, relays):
        """Get open/closed status of relays.

        @param relays: relays to check (, to separate and : for range)

        @returns tuple of relay status, status is true if a relay is closed.

        """

        status = self.query('%s (@%s)\n' % (self._CMD_CHECK_RELAYS_CLOSED,
                                            relays)).strip().split(',')
        return tuple(bool(int(x)) for x in status)

    def open_relays(self, relays):
        """Open relays.

        @param relays: string relays to close (, to separate and : for range)

        """
        self.send_cmd_check_error('%s (@%s)\n' % (
                self._CMD_OPEN_RELAYS, relays))

    def open_all_relays(self):
        """Open all relays.

        This will open all relays including the attenuator, which will set it
        to Max. Please remember to set your attenuation to right level after
        this call.

        """
        self.send_cmd_check_error('%s\n' % self._CMD_OPEN_ALL_RELAYS)

    def set_verify(self, relays, on):
        """Configure Close? to return indicator state instead of driven state.

        @param relays: relays to verify (, to separate and : for range)
        @param on: string state to verify (on(True)/off(False))

        """
        self.send_cmd_check_error('%s %s,(@%s)\n' % (
            self._CMD_SET_VERIFY, int(bool(on)), relays))

    def get_verify(self, relays):
        """Get the verify mode of relays.

        @param relays: relays to verify (, to separate and : for range)
        @returns tuple of verify mode for relays

        """
        status = self.query('%s (@%s)\n' % (
            self._CMD_GET_VERIFY, relays)).strip().split(',')
        return tuple(bool(int(x)) for x in status)

    def set_verify_inverted(self, relays, inverted):
        """Set the polarity confidence of relay to be inverted.

        @param relays: relays to set (, to separate and : for range)
        @param inverted: Boolean True if INV, False for NORM

        """
        self.send_cmd_check_error('%s %s,(@%s)\n' % (
                self._CMD_SET_VERIFY_INVERTED,
                'INV' if inverted else 'NORM', relays))

    def get_verify_inverted(self, relays):
        """Get the confidence polarity. 1 is inverted, 0 is value as-is.

        @param relays: relays to get (, to separate and : for range)
        @returns tuple of status where 1 in inverted and 0 for value as-is

        """
        status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_INVERTED,
                                            relays)).strip().split(',')
        return tuple(bool(int(x)) for x in status)

    def get_verify_state(self, relays):
        """If verify set get driven state, else confidence state.

        @param relays: relays to get (, to separate and : for range)

        @returns tuple of verify status for given relays

        """
        status = self.query('%s (@%s)\n' % (self._CMD_GET_VERIFY_STATE,
                                            relays)).strip().split(',')
        return tuple(bool(int(x)) for x in status)

    @property
    def busy(self):
        """Check relays are still settling.

        @returns Boolean True if relays are settling, False if not.

        """
        return bool(int(self.query('%s\n' % self._CMD_CHECK_BUSY).strip()))

    def wait(self):
        """Wait for relays to debounce."""
        self.write('%s\n' % self._CMD_WAIT)

    def get_attenuation(self, ap_enclosure):
        """Get Attenuation for an AP enclosure.

        Attenuation is set by turning on/off the relays.  Each relay specifies
        a bit in the binary value of attenuation.  Find the relay status and
        build the binary value, then find the decimal value from it.

        @param ap_enclosure: Int 1/2/3/4 (AP enclosure index)

        @returns attenuation value in int

        @raises ValueError: on bad ap_enclosure value

        """
        if (ap_enclosure < self._MIN_ENCLOSURE or
                ap_enclosure > self._MAX_ENCLOSURE):
            raise ValueError('Invalid AP enclosure number: %s.', ap_enclosure)
        else:
            a_status = self.relays_closed(
                self._AP_ATTENUATOR_RELAYS_SHORT[ap_enclosure])
            status = a_status[::-1]  # reverse for Endian transform
            logging.debug('attenuator status: %s', status)

            # build the binary value
            binary = ''.join(['0' if x else '1' for x in status])
            return int(binary, 2)

    def get_all_attenuation(self):
        """Get attenuation value for all AP enclosures.

        @returns tuple of attenuation value for each enclosure

        """
        attenuations = []
        for x in xrange(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1):
            attenuations.append(self.get_attenuation(x))
        return tuple(attenuations)

    def set_attenuation(self, ap_enclosure, attenuation):
        """Set attenuation for an AP enclosure.

        @param ap_enclosure: Int 0,1,2,3,4 AP enclosure number. 0 for all
        @param attenuation: Int Attenuation value to set

        @raises ValueError: on bad ap_enclosure value

        """
        if ap_enclosure == self._ALL_ENCLOSURE:
            # set attenuation on all
            for x in xrange(self._MIN_ENCLOSURE, self._MAX_ENCLOSURE + 1):
                self.set_attenuation(x, attenuation)
        elif (ap_enclosure < self._MIN_ENCLOSURE or
              ap_enclosure > self._MAX_ENCLOSURE):
            raise ValueError('Bad AP enclosure value: %s' % ap_enclosure)
        else:
            # convert attenuation decimal value to binary
            bin_array = [int(x) for x in list('{0:07b}'.format(attenuation))]
            # For endian
            reverse_bits = self._AP_ATTENUATOR_RELAYS[ap_enclosure][:: -1]

            # determine which bits should be set
            relays_to_close = [
                reverse_bits[i] for i, j in enumerate(bin_array) if not j
            ]

            # open all relay for attenuator & then close the ones we need
            relays = ','.join(self._AP_ATTENUATOR_RELAYS[ap_enclosure])
            self.open_relays(relays)
            logging.debug('Attenuator relays opened: %s', relays)
            relays = ','.join(relays_to_close)
            self.close_relays(relays)
            logging.debug('Attenuator relays closed: %s', relays)

    def get_ap_connections(self):
        """Get a list of AP to client connections.

        @returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...)

        """
        # Get the closed status for relays connected to AP enclosures.
        ap_connections = self.relays_closed(self._ALL_AP_RELAYS)

        # Find out the connections
        connections = []
        for index, relay_num in enumerate(ap_connections):
            # if the relay is closed, there is a connection
            if relay_num:
                connection = {
                    'AP': (index / self._RELAYS_PER_ENCLOSURE) + 1,
                    'Client': (index % self._RELAYS_PER_ENCLOSURE) + 1
                }
                connections.append(connection)
        return tuple(connections)

    def get_client_connections(self):
        """Get a list of AP to client connections.

        @returns tuple of dict of connections ({'AP': 1, 'Client': 1}, ...)

        """
        # Get the closed status for relays connected to client enclosures.
        c_connections = self.relays_closed(self._ALL_CLIENT_RELAYS)

        # Find out the connections
        connections = []
        for index, relay_num in enumerate(c_connections):
            # if the relay is closed, there is a connection
            if relay_num:
                connection = {
                    'AP': (index % self._RELAYS_PER_ENCLOSURE) + 1,
                    'Client': (index / self._RELAYS_PER_ENCLOSURE) + 1
                }
                connections.append(connection)
        return tuple(connections)

    def connect_ap_client(self, ap, client):
        """Connect AP and a Client enclosure.

        @param ap: Int 1/2/3/4 AP enclosure index
        @param client: Int 1/2/3/4 Client enclosure index

        @raises ValueError: when ap / client value is not 1/2/3/4

        """
        if (ap < self._MIN_ENCLOSURE or ap > self._MAX_ENCLOSURE):
            raise ValueError(
                'AP enclosure should be 1/2/3/4. Given: %s' % ap)
        if (client < self._MIN_ENCLOSURE or
                client > self._MAX_ENCLOSURE):
            raise ValueError('Client enclosure should be 1/2/3/4. Given: %s' %
                             client)

        # Relay index start from 0 so subtract 1 for index
        relays = 'k1_%s,k1_%s' % (
            ((int(ap) - 1) * self._RELAYS_PER_ENCLOSURE) + int(client),
            (((int(client) - 1) + self._MAX_ENCLOSURE) *
             self._RELAYS_PER_ENCLOSURE) + int(ap))
        logging.debug('Relays to close: %s', relays)
        self.close_relays(relays)


@contextlib.contextmanager
def RfSwitchSession(host, port=scpi.Scpi.SCPI_PORT):
    """Start a RF Switch session and close it when done.

    @param host: Hostname or IP address of RF Switch
    @param port: Int SCPI port number (default 5025)

    @yields: an instance of InteractiveSsh
    """
    session = RfSwitch(host, port)
    try:
        yield session
    finally:
        session.close()