普通文本  |  434行  |  15.42 KB

#!/usr/bin/python

# Copyright (C) 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import fcntl
import logging
logging.getLogger().setLevel(logging.ERROR)

import os.path
import select
import stat
import struct
import sys
import time
import collections
import socket
import glob
import signal
import serial           # http://pyserial.sourceforge.net/

#Set to True if you want log output to go to screen:
LOG_TO_SCREEN = False

TIMEOUT_SERIAL = 1 #seconds

#ignore SIG CONTINUE signals
for signum in [signal.SIGCONT]:              
  signal.signal(signum, signal.SIG_IGN)

try:
  from . import Abstract_Power_Monitor
except:
  sys.exit("You cannot run 'monsoon.py' directly.  Run 'execut_power_tests.py' instead.")
  
class Power_Monitor(Abstract_Power_Monitor):
  """
  Provides a simple class to use the power meter, e.g.
  mon = monsoon.Power_Monitor()
  mon.SetVoltage(3.7)
  mon.StartDataCollection()
  mydata = []
  while len(mydata) < 1000:
    mydata.extend(mon.CollectData())
  mon.StopDataCollection()
  """
  _do_log = False

  @staticmethod
  def lock( device ):
      tmpname = "/tmp/monsoon.%s.%s" % ( os.uname()[0],
                                         os.path.basename(device))
      lockfile = open(tmpname, "w")
      try:  # use a lockfile to ensure exclusive access
          fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
          logging.debug("Locked device %s"%device)
      except IOError as e:
          self.log("device %s is in use" % dev)
          sys.exit('device in use')
      return lockfile
  
  def to_string(self):
      return self._devicename
  
  def __init__(self, device = None, wait = False, log_file_id= None ):
    """
    Establish a connection to a Power_Monitor.
    By default, opens the first available port, waiting if none are ready.
    A particular port can be specified with "device".
    With wait=0, IOError is thrown if a device is not immediately available.
    """
    self._lockfile = None
    self._logfile = None
    self.ser = None
    for signum in [signal.SIGALRM, signal.SIGHUP, signal.SIGINT,
                   signal.SIGILL, signal.SIGQUIT,
                   signal.SIGTRAP,signal.SIGABRT, signal.SIGIOT, signal.SIGBUS,
                   signal.SIGFPE, signal.SIGSEGV, signal.SIGUSR2, signal.SIGPIPE,
                   signal.SIGTERM]:
      signal.signal(signum, self.handle_signal)

    self._coarse_ref = self._fine_ref = self._coarse_zero = self._fine_zero = 0
    self._coarse_scale = self._fine_scale = 0
    self._last_seq = 0
    self.start_voltage = 0
        
    if device:
      if isinstance( device, serial.Serial ):
        self.ser = device
        
    else:
        device_list = None
        while not device_list:
            device_list = Power_Monitor.Discover()
            if not device_list and wait:
                time.sleep(1.0)
                logging.info("No power monitor serial devices found.  Retrying...")
            elif not device_list and not wait:
                logging.error("No power monitor serial devices found.  Exiting")
                self.Close()
                sys.exit("No power monitor serial devices found")
                
        if device_list:
            if len(device_list) > 1:
                logging.error("=======================================")
                logging.error("More than one power monitor discovered!")
                logging.error("Test may not execute properly.Aborting test.")
                logging.error("=======================================")
                sys.exit("More than one power monitor connected.")
            device = device_list[0].to_string() # choose the first one
            if len(device_list) > 1:
                logging.info("More than one device found.  Using %s"%device)
            else:
                logging.info("Power monitor @ %s"%device)
        else: raise IOError("No device found")
          
    self._lockfile = Power_Monitor.lock( device )
    if log_file_id is not None:
        self._logfilename = "/tmp/monsoon_%s_%s.%s.log" % (os.uname()[0], os.path.basename(device),
                                                            log_file_id)
        self._logfile = open(self._logfilename,'a')
    else:
        self._logfile = None
    try:
        self.ser = serial.Serial(device, timeout= TIMEOUT_SERIAL)
    except Exception as e:
      self.log( "error opening device %s: %s" % (dev, e))
      self._lockfile.close()
      raise
    logging.debug("Setting up power monitor...")
    self._devicename = device
    #just in case, stop any active data collection on monsoon
    self._dataCollectionActive = True
    self.StopDataCollection()
    logging.debug("Flushing input...")
    self._FlushInput()  # discard stale input
    logging.debug("Getting status....")
    status = self.GetStatus()
    
    if not status:
      self.log( "no response from device %s" % device)
      self._lockfile.close()
      raise IOError("Failed to get status from device")
    self.start_voltage = status["voltage1"]
    
  def __del__(self):
    self.Close()

  def Close(self):
    if self._logfile:
      print("=============\n"+\
            "Power Monitor log file can be found at '%s'"%self._logfilename +
            "=============\n")
      self._logfile.close()
      self._logfile = None
    if (self.ser):
      #self.StopDataCollection()
      self.ser.flush()
      self.ser.close()
      self.ser = None
    if self._lockfile:
      self._lockfile.close()

  def log(self, msg , debug = False):
    if self._logfile: self._logfile.write( msg + "\n")
    if not debug and LOG_TO_SCREEN:
      logging.error( msg )
    else:
      logging.debug(msg)

  def handle_signal( self, signum, frame):
    if self.ser:
      self.ser.flush()
      self.ser.close()
      self.ser = None
    self.log("Got signal %d"%signum)
    sys.exit("\nGot signal %d\n"%signum)
    
  @staticmethod
  def Discover():
    monsoon_list = []
    elapsed = 0
    logging.info("Discovering power monitor(s)...")
    ser_device_list = glob.glob("/dev/ttyACM*")
    logging.info("Seeking devices %s"%ser_device_list)
    for dev in ser_device_list:
        try:
            lockfile = Power_Monitor.lock( dev )
        except:
            logging.info( "... device %s in use, skipping"%dev)
            continue
        tries = 0
        ser = None
        while ser is None and tries < 100:
             try:  # try to open the device
                ser = serial.Serial( dev, timeout=TIMEOUT_SERIAL)
             except Exception as e:
                logging.error(  "error opening device %s: %s" % (dev, e) )
                tries += 1
                time.sleep(2);
                ser = None
        logging.info("... found device %s"%dev)
        lockfile.close()#will be re-locked once monsoon instance created
        logging.debug("unlocked")
        if not ser:
            continue
        if ser is not None:
            try:
                monsoon = Power_Monitor(device = dev)
                status = monsoon.GetStatus()
                
                if not status:
                    monsoon.log("... no response from device %s, skipping")
                    continue
                else:
                    logging.info("... found power monitor @ %s"%dev)
                    monsoon_list.append( monsoon )
            except:
                import traceback
                traceback.print_exc()
                logging.error("... %s appears to not be a monsoon device"%dev)
    logging.debug("Returning list of %s"%monsoon_list)
    return monsoon_list

  def GetStatus(self):
    """ Requests and waits for status.  Returns status dictionary. """

    # status packet format
    self.log("Getting status...", debug = True)
    STATUS_FORMAT = ">BBBhhhHhhhHBBBxBbHBHHHHBbbHHBBBbbbbbbbbbBH"
    STATUS_FIELDS = [
        "packetType", "firmwareVersion", "protocolVersion",
        "mainFineCurrent", "usbFineCurrent", "auxFineCurrent", "voltage1",
        "mainCoarseCurrent", "usbCoarseCurrent", "auxCoarseCurrent", "voltage2",
        "outputVoltageSetting", "temperature", "status", "leds",
        "mainFineResistor", "serialNumber", "sampleRate",
        "dacCalLow", "dacCalHigh",
        "powerUpCurrentLimit", "runTimeCurrentLimit", "powerUpTime",
        "usbFineResistor", "auxFineResistor",
        "initialUsbVoltage", "initialAuxVoltage",
        "hardwareRevision", "temperatureLimit", "usbPassthroughMode",
        "mainCoarseResistor", "usbCoarseResistor", "auxCoarseResistor",
        "defMainFineResistor", "defUsbFineResistor", "defAuxFineResistor",
        "defMainCoarseResistor", "defUsbCoarseResistor", "defAuxCoarseResistor",
        "eventCode", "eventData", ]

    self._SendStruct("BBB", 0x01, 0x00, 0x00)
    while True:  # Keep reading, discarding non-status packets
      bytes = self._ReadPacket()
      if not bytes: return None
      if len(bytes) != struct.calcsize(STATUS_FORMAT) or bytes[0] != "\x10":
        self.log("wanted status, dropped type=0x%02x, len=%d" % (
                ord(bytes[0]), len(bytes)))
        continue

      status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, bytes)))
      assert status["packetType"] == 0x10
      for k in status.keys():
        if k.endswith("VoltageSetting"):
          status[k] = 2.0 + status[k] * 0.01
        elif k.endswith("FineCurrent"):
          pass # needs calibration data
        elif k.endswith("CoarseCurrent"):
          pass # needs calibration data
        elif k.startswith("voltage") or k.endswith("Voltage"):
          status[k] = status[k] * 0.000125
        elif k.endswith("Resistor"):
          status[k] = 0.05 + status[k] * 0.0001
          if k.startswith("aux") or k.startswith("defAux"): status[k] += 0.05
        elif k.endswith("CurrentLimit"):
          status[k] = 8 * (1023 - status[k]) / 1023.0
      #self.log( "Returning requested status: \n %s"%(status), debug = True)
      return status

  def RampVoltage(self, start, end):
    v = start
    if v < 3.0: v = 3.0       # protocol doesn't support lower than this
    while (v < end):
      self.SetVoltage(v)
      v += .1
      time.sleep(.1)
    self.SetVoltage(end)

  def SetVoltage(self, v):
    """ Set the output voltage, 0 to disable. """
    self.log("Setting voltage to %s..."%v, debug = True)
    if v == 0:
      self._SendStruct("BBB", 0x01, 0x01, 0x00)
    else:
      self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100))
    self.log("...Set voltage", debug = True)

  def SetMaxCurrent(self, i):
    """Set the max output current."""
    assert i >= 0 and i <= 8
    self.log("Setting max current to %s..."%i, debug = True)
    val = 1023 - int((i/8)*1023)
    self._SendStruct("BBB", 0x01, 0x0a, val & 0xff)
    self._SendStruct("BBB", 0x01, 0x0b, val >> 8)
    self.log("...Set max current.", debug = True)
    
  def SetUsbPassthrough(self, val):
    """ Set the USB passthrough mode: 0 = off, 1 = on,  2 = auto. """
    self._SendStruct("BBB", 0x01, 0x10, val)

  def StartDataCollection(self):    
    """ Tell the device to start collecting and sending measurement data. """
    self.log("Starting data collection...", debug = True)
    self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command
    self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8)
    self.log("...started", debug = True)
    self._dataCollectionActive = True
    
  def StopDataCollection(self):
    """ Tell the device to stop collecting measurement data. """
    self._SendStruct("BB", 0x03, 0x00) # stop
    if self._dataCollectionActive:
      while self.CollectData(False) is not None:
        pass
    self._dataCollectionActive = False
    
  def CollectData(self, verbose = True):
    """ Return some current samples.  Call StartDataCollection() first. """
    #self.log("Collecting data ...", debug = True)
    while True:  # loop until we get data or a timeout
      bytes = self._ReadPacket(verbose)
      
      if not bytes: return None
      if len(bytes) < 4 + 8 + 1 or bytes[0] < "\x20" or bytes[0] > "\x2F":
        if verbose: self.log( "wanted data, dropped type=0x%02x, len=%d" % (
          ord(bytes[0]), len(bytes)), debug=verbose)
        continue

      seq, type, x, y = struct.unpack("BBBB", bytes[:4])
      data = [struct.unpack(">hhhh", bytes[x:x+8])
              for x in range(4, len(bytes) - 8, 8)]

      if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF:
        self.log( "data sequence skipped, lost packet?" )
      self._last_seq = seq

      if type == 0:
        if not self._coarse_scale or not self._fine_scale:
          self.log("waiting for calibration, dropped data packet")
          continue

        out = []
        for main, usb, aux, voltage in data:
          if main & 1:
            out.append(((main & ~1) - self._coarse_zero) * self._coarse_scale)
          else:
            out.append((main - self._fine_zero) * self._fine_scale)
        #self.log("...Collected %d samples"%(len(out)), debug = True)
        return out

      elif type == 1:
        self._fine_zero = data[0][0]
        self._coarse_zero = data[1][0]

      elif type == 2:
        self._fine_ref = data[0][0]
        self._coarse_ref = data[1][0]

      else:
        self.log( "discarding data packet type=0x%02x" % type)
        continue

      if self._coarse_ref != self._coarse_zero:
        self._coarse_scale = 2.88 / (self._coarse_ref - self._coarse_zero)
      if self._fine_ref != self._fine_zero:
        self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero)


  def _SendStruct(self, fmt, *args):
    """ Pack a struct (without length or checksum) and send it. """
    data = struct.pack(fmt, *args)
    data_len = len(data) + 1
    checksum = (data_len + sum(struct.unpack("B" * len(data), data))) % 256
    out = struct.pack("B", data_len) + data + struct.pack("B", checksum)
    self.ser.write(out)
    self.ser.flush()

  def _ReadPacket(self, verbose = True):
    """ Read a single data record as a string (without length or checksum). """
    len_char = self.ser.read(1)
    if not len_char:
      if verbose: self.log( "timeout reading from serial port" )
      return None

    data_len = struct.unpack("B", len_char)
    data_len = ord(len_char)
    if not data_len: return ""

    result = self.ser.read(data_len)
    if len(result) != data_len: return None
    body = result[:-1]
    checksum = (data_len + sum(struct.unpack("B" * len(body), body))) % 256
    if result[-1] != struct.pack("B", checksum):
      self.log( "Invalid checksum from serial port" )
      return None
    return result[:-1]

  def _FlushInput(self):
    """ Flush all read data until no more available. """
    self.ser.flushInput()
    flushed = 0
    self.log("Flushing input...", debug = True)
    while True:
      ready_r, ready_w, ready_x = select.select([self.ser], [], [self.ser], 0)
      if len(ready_x) > 0:
        self.log( "exception from serial port" )
        return None
      elif len(ready_r) > 0:
        flushed += 1
        self.ser.read(1)  # This may cause underlying buffering.
        self.ser.flush()  # Flush the underlying buffer too.
      else:
        break
    if flushed > 0:
      self.log( "flushed >%d bytes" % flushed, debug = True )