#!/usr/bin/python # Copyright (c) 2011 The Chromium OS Authors. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """Wrapper for an RF switch built on an Elexol EtherIO24. The EtherIO is documented at http://www.elexol.com/IO_Modules/Ether_IO_24_Dip_R.php This file is both a python module and a command line utility to speak to the module """ import cellular_logging import collections import socket import struct import sys log = cellular_logging.SetupCellularLogging('ether_io_rf_switch') class Error(Exception): pass class EtherIo24(object): """Encapsulates an EtherIO24 UDP-GPIO bridge.""" def __init__(self, hostname, port=2424): self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.bind(('', 0)) self.destination = (hostname, port) self.socket.settimeout(3) # In seconds def SendPayload(self, payload): self.socket.sendto(payload, self.destination) def SendOperation(self, opcode, list_bytes): """Sends the specified opcode with [list_bytes] as an argument.""" payload = opcode + struct.pack(('=%dB' % len(list_bytes)), *list_bytes) self.SendPayload(payload) return payload def SendCommandVerify(self, write_opcode, list_bytes, read_opcode=None): """Sends opcode and bytes, then reads to make sure command was executed.""" if read_opcode is None: read_opcode = write_opcode.lower() for _ in xrange(3): write_sent = self.SendOperation(write_opcode, list_bytes) self.SendOperation(read_opcode, list_bytes) try: response = self.AwaitResponse() if response == write_sent: return else: log.warning('Unexpected reply: sent %s, got %s', write_sent.encode('hex_codec'), response.encode('hex_codec')) except socket.timeout: log.warning('Timed out awaiting reply for %s', write_opcode) continue raise Error('Failed to execute %s' % write_sent.encode('hex_codec')) def AwaitResponse(self): (response, address) = self.socket.recvfrom(65536) if (socket.gethostbyname(address[0]) != socket.gethostbyname(self.destination[0])): log.warning('Unexpected reply source: %s (expected %s)', address, self.destination) return response class RfSwitch(object): """An RF switch hooked to an Elexol EtherIO24.""" def __init__(self, ip): self.io = EtherIo24(ip) # Must run on pythons without 0bxxx notation. These are 1110, # 1101, 1011, 0111 decode = [0xe, 0xd, 0xb, 0x7] self.port_mapping = [] for upper in xrange(3): for lower in xrange(4): self.port_mapping.append(decode[upper] << 4 | decode[lower]) def SelectPort(self, n): """Connects port n to the RF generator.""" # Set all pins to output # !A0: all pins output self.io.SendCommandVerify('!A', [0]) self.io.SendCommandVerify('A', [self.port_mapping[n]]) def Query(self): """Returns (binary port status, selected port, port direction).""" self.io.SendOperation('!a', []) raw_direction = self.io.AwaitResponse() direction = ord(raw_direction[2]) self.io.SendOperation('a', []) status = ord(self.io.AwaitResponse()[1]) try: port = self.port_mapping.index(status) except ValueError: port = None return status, port, direction def CommandLineUtility(arguments): """Command line utility to control a switch.""" def Select(switch, remaining_args): switch.SelectPort(int(remaining_args.popleft())) def Query(switch, unused_remaining_args): (raw_status, port, direction) = switch.Query() if direction != 0x00: print 'Warning: Direction register is %x, should be 0x00' % \ direction if port is None: port_str = 'Invalid' else: port_str = str(port) print 'Port %s (0x%x)' % (port_str, raw_status) def Usage(): print 'usage: %s hostname {query|select portnumber}' % sys.argv[0] exit(1) try: hostname = arguments.popleft() operation = arguments.popleft() switch = RfSwitch(hostname) if operation == 'query': Query(switch, arguments) elif operation == 'select': Select(switch, arguments) else: Usage() except IndexError: Usage() if __name__ == '__main__': CommandLineUtility(collections.deque(sys.argv[1:]))