普通文本  |  101行  |  3.62 KB

# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

from autotest_lib.client.bin import test, utils
from autotest_lib.client.common_lib import error

import dbus, dbus.mainloop.glib, gobject

class cellular_GobiSwitchCarrier(test.test):
    version = 1

    def fail(self, msg):
        print 'Failed: %s' % msg
        self.failed = error.TestFail(msg)
        self.loop.quit()

    def device_added(self, dev, *args, **kwargs):
        print 'Device added: %s' % dev
        self.modem = self.bus.get_object(self.CMM, dev)
        carrier = self.get_carrier()
        if not carrier:
            print 'No carrier.'
            return
        if carrier != self.to_carrier:
            self.fail('Wrong carrier: %s != %s' % (carrier, self.to_carrier))
        if not self.carriers:
            self.loop.quit() # success!
            return
        while len(self.carriers):
            try:
                self.to_carrier = self.carriers[0]
                self.carriers = self.carriers[1:]
                self.set_carrier(self.to_carrier)
                break
            except dbus.exceptions.DBusException, e:
                if e.get_dbus_message() == "Unknown carrier name":
                    print 'Ignoring invalid carrier %s' % self.to_carrier
                    continue
                raise

    def device_removed(self, *args, **kwargs):
        print 'Device removed.'

    def waitfor(self, signame, fn):
        print 'Waiting for %s' % signame
        self.bus.add_signal_receiver(fn, signal_name=signame,
                                     dbus_interface=self.IMM)

    def timeout(self):
        self.fail('Timeout')

    def get_carrier(self):
        status = self.modem.GetStatus(dbus_interface=self.IMODEM_SIMPLE)
        if not status or not 'carrier' in status:
            self.fail('Bogus GetStatus reply: %s' % status)
            return None
        return status['carrier']

    def set_carrier(self, c):
        print 'Switch: ? -> %s' % c
        self.modem.SetCarrier(c, dbus_interface=self.IMODEM_GOBI)

    def find_modem(self):
        modems = self.mm.EnumerateDevices(dbus_interface=self.IMM)
        if modems:
            self.modem = self.bus.get_object(self.CMM, modems[0])
        else:
            self.modem = None

    def run_once(self, start_carrier='Verizon Wireless',
                 carriers=None,
                 timeout_secs=90):
        carriers = carriers or ['Vodafone', 'Sprint', 'Verizon Wireless']
        self.CMM = 'org.chromium.ModemManager'
        self.IMM = 'org.freedesktop.ModemManager'
        self.IMODEM_SIMPLE = self.IMM + '.Modem.Simple'
        self.IMODEM_GOBI = 'org.chromium.ModemManager.Modem.Gobi'
        self.failed = None
        self.carriers = carriers
        self.to_carrier = None
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
        self.loop = gobject.MainLoop()
        self.bus = dbus.SystemBus()
        gobject.timeout_add(timeout_secs * 1000, self.timeout)
        self.mm = self.bus.get_object(self.CMM, '/org/chromium/ModemManager')
        self.find_modem()
        self.waitfor('DeviceRemoved', self.device_removed)
        self.waitfor('DeviceAdded', self.device_added)
        carrier = self.get_carrier()
        if not carrier:
            raise self.failed
        self.to_carrier = carrier
        self.device_added(self.modem.__dbus_object_path__) # start test
        self.loop.run()
        self.find_modem()
        if self.modem and self.to_carrier != carrier:
            self.set_carrier(carrier)
        if self.failed:
            raise self.failed