普通文本  |  152行  |  4.79 KB

#!/usr/bin/python
# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Wrapper for an RF switch built on an Elexol EtherIO24.

The EtherIO is documented at
        http://www.elexol.com/IO_Modules/Ether_IO_24_Dip_R.php

This file is both a python module and a command line utility to speak
to the module
"""

import cellular_logging
import collections
import socket
import struct
import sys

log = cellular_logging.SetupCellularLogging('ether_io_rf_switch')


class Error(Exception):
    pass


class EtherIo24(object):
    """Encapsulates an EtherIO24 UDP-GPIO bridge."""

    def __init__(self, hostname, port=2424):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        self.socket.bind(('', 0))
        self.destination = (hostname, port)
        self.socket.settimeout(3)   # In seconds

    def SendPayload(self, payload):
        self.socket.sendto(payload, self.destination)

    def SendOperation(self, opcode, list_bytes):
        """Sends the specified opcode with [list_bytes] as an argument."""
        payload = opcode + struct.pack(('=%dB' % len(list_bytes)), *list_bytes)
        self.SendPayload(payload)
        return payload

    def SendCommandVerify(self, write_opcode, list_bytes, read_opcode=None):
        """Sends opcode and bytes,
        then reads to make sure command was executed."""
        if read_opcode is None:
            read_opcode = write_opcode.lower()
        for _ in xrange(3):
            write_sent = self.SendOperation(write_opcode, list_bytes)
            self.SendOperation(read_opcode, list_bytes)
            try:
                response = self.AwaitResponse()
                if response == write_sent:
                    return
                else:
                    log.warning('Unexpected reply:  sent %s, got %s',
                                write_sent.encode('hex_codec'),
                                response.encode('hex_codec'))
            except socket.timeout:
                log.warning('Timed out awaiting reply for %s', write_opcode)
                continue
        raise Error('Failed to execute %s' % write_sent.encode('hex_codec'))

    def AwaitResponse(self):
        (response, address) = self.socket.recvfrom(65536)
        if (socket.gethostbyname(address[0]) !=
                socket.gethostbyname(self.destination[0])):
            log.warning('Unexpected reply source: %s (expected %s)',
                        address, self.destination)
        return response


class RfSwitch(object):
    """An RF switch hooked to an Elexol EtherIO24."""

    def __init__(self, ip):
        self.io = EtherIo24(ip)
        # Must run on pythons without 0bxxx notation.  These are 1110,
        # 1101, 1011, 0111
        decode = [0xe, 0xd, 0xb, 0x7]

        self.port_mapping = []
        for upper in xrange(3):
            for lower in xrange(4):
                self.port_mapping.append(decode[upper] << 4 | decode[lower])

    def SelectPort(self, n):
        """Connects port n to the RF generator."""
        # Set all pins to output

        # !A0:  all pins output
        self.io.SendCommandVerify('!A', [0])
        self.io.SendCommandVerify('A', [self.port_mapping[n]])

    def Query(self):
        """Returns (binary port status, selected port, port direction)."""
        self.io.SendOperation('!a', [])
        raw_direction = self.io.AwaitResponse()
        direction = ord(raw_direction[2])

        self.io.SendOperation('a', [])
        status = ord(self.io.AwaitResponse()[1])
        try:
            port = self.port_mapping.index(status)
        except ValueError:
            port = None

        return status, port, direction


def CommandLineUtility(arguments):
    """Command line utility to control a switch."""

    def Select(switch, remaining_args):
        switch.SelectPort(int(remaining_args.popleft()))

    def Query(switch, unused_remaining_args):
        (raw_status, port, direction) = switch.Query()
        if direction != 0x00:
            print 'Warning: Direction register is %x, should be 0x00' % \
                  direction
        if port is None:
            port_str = 'Invalid'
        else:
            port_str = str(port)
        print 'Port %s  (0x%x)' % (port_str, raw_status)

    def Usage():
        print 'usage:  %s hostname {query|select portnumber}' % sys.argv[0]
        exit(1)

    try:
        hostname = arguments.popleft()
        operation = arguments.popleft()

        switch = RfSwitch(hostname)

        if operation == 'query':
            Query(switch, arguments)
        elif operation == 'select':
            Select(switch, arguments)
        else:
            Usage()
    except IndexError:
        Usage()

if __name__ == '__main__':
    CommandLineUtility(collections.deque(sys.argv[1:]))