#!/usr/bin/python
# Copyright (C) 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import fcntl
import logging
logging.getLogger().setLevel(logging.ERROR)
import os.path
import select
import stat
import struct
import sys
import time
import collections
import socket
import glob
import signal
import serial # http://pyserial.sourceforge.net/
#Set to True if you want log output to go to screen:
LOG_TO_SCREEN = False
TIMEOUT_SERIAL = 1 #seconds
#ignore SIG CONTINUE signals
for signum in [signal.SIGCONT]:
signal.signal(signum, signal.SIG_IGN)
try:
from . import Abstract_Power_Monitor
except:
sys.exit("You cannot run 'monsoon.py' directly. Run 'execut_power_tests.py' instead.")
class Power_Monitor(Abstract_Power_Monitor):
"""
Provides a simple class to use the power meter, e.g.
mon = monsoon.Power_Monitor()
mon.SetVoltage(3.7)
mon.StartDataCollection()
mydata = []
while len(mydata) < 1000:
mydata.extend(mon.CollectData())
mon.StopDataCollection()
"""
_do_log = False
@staticmethod
def lock( device ):
tmpname = "/tmp/monsoon.%s.%s" % ( os.uname()[0],
os.path.basename(device))
lockfile = open(tmpname, "w")
try: # use a lockfile to ensure exclusive access
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
logging.debug("Locked device %s"%device)
except IOError as e:
self.log("device %s is in use" % dev)
sys.exit('device in use')
return lockfile
def to_string(self):
return self._devicename
def __init__(self, device = None, wait = False, log_file_id= None ):
"""
Establish a connection to a Power_Monitor.
By default, opens the first available port, waiting if none are ready.
A particular port can be specified with "device".
With wait=0, IOError is thrown if a device is not immediately available.
"""
self._lockfile = None
self._logfile = None
self.ser = None
for signum in [signal.SIGALRM, signal.SIGHUP, signal.SIGINT,
signal.SIGILL, signal.SIGQUIT,
signal.SIGTRAP,signal.SIGABRT, signal.SIGIOT, signal.SIGBUS,
signal.SIGFPE, signal.SIGSEGV, signal.SIGUSR2, signal.SIGPIPE,
signal.SIGTERM]:
signal.signal(signum, self.handle_signal)
self._coarse_ref = self._fine_ref = self._coarse_zero = self._fine_zero = 0
self._coarse_scale = self._fine_scale = 0
self._last_seq = 0
self.start_voltage = 0
if device:
if isinstance( device, serial.Serial ):
self.ser = device
else:
device_list = None
while not device_list:
device_list = Power_Monitor.Discover()
if not device_list and wait:
time.sleep(1.0)
logging.info("No power monitor serial devices found. Retrying...")
elif not device_list and not wait:
logging.error("No power monitor serial devices found. Exiting")
self.Close()
sys.exit("No power monitor serial devices found")
if device_list:
if len(device_list) > 1:
logging.error("=======================================")
logging.error("More than one power monitor discovered!")
logging.error("Test may not execute properly.Aborting test.")
logging.error("=======================================")
sys.exit("More than one power monitor connected.")
device = device_list[0].to_string() # choose the first one
if len(device_list) > 1:
logging.info("More than one device found. Using %s"%device)
else:
logging.info("Power monitor @ %s"%device)
else: raise IOError("No device found")
self._lockfile = Power_Monitor.lock( device )
if log_file_id is not None:
self._logfilename = "/tmp/monsoon_%s_%s.%s.log" % (os.uname()[0], os.path.basename(device),
log_file_id)
self._logfile = open(self._logfilename,'a')
else:
self._logfile = None
try:
self.ser = serial.Serial(device, timeout= TIMEOUT_SERIAL)
except Exception as e:
self.log( "error opening device %s: %s" % (dev, e))
self._lockfile.close()
raise
logging.debug("Setting up power monitor...")
self._devicename = device
#just in case, stop any active data collection on monsoon
self._dataCollectionActive = True
self.StopDataCollection()
logging.debug("Flushing input...")
self._FlushInput() # discard stale input
logging.debug("Getting status....")
status = self.GetStatus()
if not status:
self.log( "no response from device %s" % device)
self._lockfile.close()
raise IOError("Failed to get status from device")
self.start_voltage = status["voltage1"]
def __del__(self):
self.Close()
def Close(self):
if self._logfile:
print("=============\n"+\
"Power Monitor log file can be found at '%s'"%self._logfilename +
"=============\n")
self._logfile.close()
self._logfile = None
if (self.ser):
#self.StopDataCollection()
self.ser.flush()
self.ser.close()
self.ser = None
if self._lockfile:
self._lockfile.close()
def log(self, msg , debug = False):
if self._logfile: self._logfile.write( msg + "\n")
if not debug and LOG_TO_SCREEN:
logging.error( msg )
else:
logging.debug(msg)
def handle_signal( self, signum, frame):
if self.ser:
self.ser.flush()
self.ser.close()
self.ser = None
self.log("Got signal %d"%signum)
sys.exit("\nGot signal %d\n"%signum)
@staticmethod
def Discover():
monsoon_list = []
elapsed = 0
logging.info("Discovering power monitor(s)...")
ser_device_list = glob.glob("/dev/ttyACM*")
logging.info("Seeking devices %s"%ser_device_list)
for dev in ser_device_list:
try:
lockfile = Power_Monitor.lock( dev )
except:
logging.info( "... device %s in use, skipping"%dev)
continue
tries = 0
ser = None
while ser is None and tries < 100:
try: # try to open the device
ser = serial.Serial( dev, timeout=TIMEOUT_SERIAL)
except Exception as e:
logging.error( "error opening device %s: %s" % (dev, e) )
tries += 1
time.sleep(2);
ser = None
logging.info("... found device %s"%dev)
lockfile.close()#will be re-locked once monsoon instance created
logging.debug("unlocked")
if not ser:
continue
if ser is not None:
try:
monsoon = Power_Monitor(device = dev)
status = monsoon.GetStatus()
if not status:
monsoon.log("... no response from device %s, skipping")
continue
else:
logging.info("... found power monitor @ %s"%dev)
monsoon_list.append( monsoon )
except:
import traceback
traceback.print_exc()
logging.error("... %s appears to not be a monsoon device"%dev)
logging.debug("Returning list of %s"%monsoon_list)
return monsoon_list
def GetStatus(self):
""" Requests and waits for status. Returns status dictionary. """
# status packet format
self.log("Getting status...", debug = True)
STATUS_FORMAT = ">BBBhhhHhhhHBBBxBbHBHHHHBbbHHBBBbbbbbbbbbBH"
STATUS_FIELDS = [
"packetType", "firmwareVersion", "protocolVersion",
"mainFineCurrent", "usbFineCurrent", "auxFineCurrent", "voltage1",
"mainCoarseCurrent", "usbCoarseCurrent", "auxCoarseCurrent", "voltage2",
"outputVoltageSetting", "temperature", "status", "leds",
"mainFineResistor", "serialNumber", "sampleRate",
"dacCalLow", "dacCalHigh",
"powerUpCurrentLimit", "runTimeCurrentLimit", "powerUpTime",
"usbFineResistor", "auxFineResistor",
"initialUsbVoltage", "initialAuxVoltage",
"hardwareRevision", "temperatureLimit", "usbPassthroughMode",
"mainCoarseResistor", "usbCoarseResistor", "auxCoarseResistor",
"defMainFineResistor", "defUsbFineResistor", "defAuxFineResistor",
"defMainCoarseResistor", "defUsbCoarseResistor", "defAuxCoarseResistor",
"eventCode", "eventData", ]
self._SendStruct("BBB", 0x01, 0x00, 0x00)
while True: # Keep reading, discarding non-status packets
bytes = self._ReadPacket()
if not bytes: return None
if len(bytes) != struct.calcsize(STATUS_FORMAT) or bytes[0] != "\x10":
self.log("wanted status, dropped type=0x%02x, len=%d" % (
ord(bytes[0]), len(bytes)))
continue
status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, bytes)))
assert status["packetType"] == 0x10
for k in status.keys():
if k.endswith("VoltageSetting"):
status[k] = 2.0 + status[k] * 0.01
elif k.endswith("FineCurrent"):
pass # needs calibration data
elif k.endswith("CoarseCurrent"):
pass # needs calibration data
elif k.startswith("voltage") or k.endswith("Voltage"):
status[k] = status[k] * 0.000125
elif k.endswith("Resistor"):
status[k] = 0.05 + status[k] * 0.0001
if k.startswith("aux") or k.startswith("defAux"): status[k] += 0.05
elif k.endswith("CurrentLimit"):
status[k] = 8 * (1023 - status[k]) / 1023.0
#self.log( "Returning requested status: \n %s"%(status), debug = True)
return status
def RampVoltage(self, start, end):
v = start
if v < 3.0: v = 3.0 # protocol doesn't support lower than this
while (v < end):
self.SetVoltage(v)
v += .1
time.sleep(.1)
self.SetVoltage(end)
def SetVoltage(self, v):
""" Set the output voltage, 0 to disable. """
self.log("Setting voltage to %s..."%v, debug = True)
if v == 0:
self._SendStruct("BBB", 0x01, 0x01, 0x00)
else:
self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100))
self.log("...Set voltage", debug = True)
def SetMaxCurrent(self, i):
"""Set the max output current."""
assert i >= 0 and i <= 8
self.log("Setting max current to %s..."%i, debug = True)
val = 1023 - int((i/8)*1023)
self._SendStruct("BBB", 0x01, 0x0a, val & 0xff)
self._SendStruct("BBB", 0x01, 0x0b, val >> 8)
self.log("...Set max current.", debug = True)
def SetUsbPassthrough(self, val):
""" Set the USB passthrough mode: 0 = off, 1 = on, 2 = auto. """
self._SendStruct("BBB", 0x01, 0x10, val)
def StartDataCollection(self):
""" Tell the device to start collecting and sending measurement data. """
self.log("Starting data collection...", debug = True)
self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command
self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8)
self.log("...started", debug = True)
self._dataCollectionActive = True
def StopDataCollection(self):
""" Tell the device to stop collecting measurement data. """
self._SendStruct("BB", 0x03, 0x00) # stop
if self._dataCollectionActive:
while self.CollectData(False) is not None:
pass
self._dataCollectionActive = False
def CollectData(self, verbose = True):
""" Return some current samples. Call StartDataCollection() first. """
#self.log("Collecting data ...", debug = True)
while True: # loop until we get data or a timeout
bytes = self._ReadPacket(verbose)
if not bytes: return None
if len(bytes) < 4 + 8 + 1 or bytes[0] < "\x20" or bytes[0] > "\x2F":
if verbose: self.log( "wanted data, dropped type=0x%02x, len=%d" % (
ord(bytes[0]), len(bytes)), debug=verbose)
continue
seq, type, x, y = struct.unpack("BBBB", bytes[:4])
data = [struct.unpack(">hhhh", bytes[x:x+8])
for x in range(4, len(bytes) - 8, 8)]
if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF:
self.log( "data sequence skipped, lost packet?" )
self._last_seq = seq
if type == 0:
if not self._coarse_scale or not self._fine_scale:
self.log("waiting for calibration, dropped data packet")
continue
out = []
for main, usb, aux, voltage in data:
if main & 1:
out.append(((main & ~1) - self._coarse_zero) * self._coarse_scale)
else:
out.append((main - self._fine_zero) * self._fine_scale)
#self.log("...Collected %d samples"%(len(out)), debug = True)
return out
elif type == 1:
self._fine_zero = data[0][0]
self._coarse_zero = data[1][0]
elif type == 2:
self._fine_ref = data[0][0]
self._coarse_ref = data[1][0]
else:
self.log( "discarding data packet type=0x%02x" % type)
continue
if self._coarse_ref != self._coarse_zero:
self._coarse_scale = 2.88 / (self._coarse_ref - self._coarse_zero)
if self._fine_ref != self._fine_zero:
self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero)
def _SendStruct(self, fmt, *args):
""" Pack a struct (without length or checksum) and send it. """
data = struct.pack(fmt, *args)
data_len = len(data) + 1
checksum = (data_len + sum(struct.unpack("B" * len(data), data))) % 256
out = struct.pack("B", data_len) + data + struct.pack("B", checksum)
self.ser.write(out)
self.ser.flush()
def _ReadPacket(self, verbose = True):
""" Read a single data record as a string (without length or checksum). """
len_char = self.ser.read(1)
if not len_char:
if verbose: self.log( "timeout reading from serial port" )
return None
data_len = struct.unpack("B", len_char)
data_len = ord(len_char)
if not data_len: return ""
result = self.ser.read(data_len)
if len(result) != data_len: return None
body = result[:-1]
checksum = (data_len + sum(struct.unpack("B" * len(body), body))) % 256
if result[-1] != struct.pack("B", checksum):
self.log( "Invalid checksum from serial port" )
return None
return result[:-1]
def _FlushInput(self):
""" Flush all read data until no more available. """
self.ser.flushInput()
flushed = 0
self.log("Flushing input...", debug = True)
while True:
ready_r, ready_w, ready_x = select.select([self.ser], [], [self.ser], 0)
if len(ready_x) > 0:
self.log( "exception from serial port" )
return None
elif len(ready_r) > 0:
flushed += 1
self.ser.read(1) # This may cause underlying buffering.
self.ser.flush() # Flush the underlying buffer too.
else:
break
if flushed > 0:
self.log( "flushed >%d bytes" % flushed, debug = True )