# Copyright (c) 2011 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
import dbus
import glib
import gobject
import logging
import os
import pty
import re
import subprocess
import traceback
from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.cellular import mm
from autotest_lib.client.cros.cellular import modem_utils
TEST_TIMEOUT = 120
# Preconditions for starting
START_DEVICE_PRESENT = 'start_device_present'
START_UDEVADM_RUNNING = 'start_udevadm_running'
# ModemManager service enters, leaves bus
MM_APPEARED = 'modem_manager_appeared'
MM_DISAPPEARED = 'modem_manager_disappeared'
# userlevel UDEV sees the modem come and go
MODEM_APPEARED = 'modem_appeared'
MODEM_DISAPPEARED = 'modem_disappeared'
class TestEventLoop(object):
"""Common tools for running glib event loops."""
def __init__(self):
# The glib mainloop sinks exceptions thrown by event handlers, so we
# provide a wrapper that saves the exceptions so the event loop can
# re-raise them.
# TODO(rochberg): The rethrown exceptions come with the stack of the
# rethrow point, not the original exceptions. Fix.
self.to_raise = None
# Autotest won't continue until our children are dead. Keep track
# of them
self.to_kill = []
def ExceptionWrapper(self, f):
"""Returns a wrapper that calls f and saves exceptions for re-raising."""
def to_return(*args, **kwargs):
try:
return f(*args, **kwargs)
except Exception, e:
logging.info('Caught: ' + traceback.format_exc())
self.to_raise = e
return True
return to_return
def Popen(self, *args, **kwargs):
"""Builds a supbrocess.Popen, saves a copy for later kill()ing."""
to_return = subprocess.Popen(*args, **kwargs)
self.to_kill.append(to_return)
return to_return
def KillSubprocesses(self):
for victim in self.to_kill:
victim.kill()
class GobiDesyncEventLoop(TestEventLoop):
def __init__(self, test_env):
super(GobiDesyncEventLoop, self).__init__()
self.test_env = test_env
self.dbus_signal_receivers = []
# Start conditions; once these have been met, call StartTest.
# This makes sure that cromo and udevadm are ready to use
self.remaining_start_conditions = set([START_DEVICE_PRESENT,
START_UDEVADM_RUNNING])
# We want to see all of these events before we're done
self.remaining_events = set([MM_APPEARED, MM_DISAPPEARED,
MODEM_APPEARED, MODEM_DISAPPEARED, ])
# udevadm monitor output for user-level notifications of device
# add and remove
# UDEV [1296763045.687859] add /devices/virtual/QCQMI/qcqmi0 (QCQMI)
self.udev_qcqmi = re.compile(
r'UDEV.*\s(?P<action>\w+).*/QCQMI/qcqmi')
def NameOwnerChanged(self, name, old, new):
if name != 'org.chromium.ModemManager':
return
if not new:
self.remaining_events.remove(MM_DISAPPEARED)
elif not old:
if MM_DISAPPEARED in self.remaining_events:
raise Exception('Saw cromo appear before it disappeared')
self.remaining_events.remove(MM_APPEARED)
return True
def ModemAdded(self, path):
"""Clock the StartIfReady() state machine when we see a modem added."""
logging.info('Modem %s added' % path)
self.StartIfReady() # Checks to see if the modem is present
def TimedOut(self):
raise Exception('Timed out: still waiting for: '
+ str(self.remaining_events))
def UdevOutputReceived(self, source, condition):
if condition & glib.IO_IN:
output = os.read(source.fileno(), 65536)
# We don't want to start the test until udevadm is running
if 'KERNEL - the kernel uevent' in output:
self.StartIfReady(START_UDEVADM_RUNNING)
for line in output.split('\r\n'):
logging.info(line)
match = self.udev_qcqmi.search(line)
if not match:
continue
action = match.group('action')
logging.info('Action:[%s]' % action)
if action == 'add':
if MM_DISAPPEARED in self.remaining_events:
raise Exception('Saw modem appear before it disappeared')
self.remaining_events.remove(MODEM_APPEARED)
elif action == 'remove':
self.remaining_events.remove(MODEM_DISAPPEARED)
return True
def StartIfReady(self, condition_to_remove=None):
"""Call StartTest when remaining_start_conditions have been met."""
if condition_to_remove:
self.remaining_start_conditions.discard(condition_to_remove)
try:
if (START_DEVICE_PRESENT in self.remaining_start_conditions and
mm.PickOneModem('Gobi')):
self.remaining_start_conditions.discard(START_DEVICE_PRESENT)
except dbus.exceptions.DBusException, e:
if e.get_dbus_name() != 'org.freedesktop.DBus.Error.NoReply':
raise
if self.remaining_start_conditions:
logging.info('Not starting until: %s' % self.remaining_start_conditions)
else:
logging.info('Preconditions satisfied')
self.StartTest()
self.remaining_start_conditions = ['dummy entry so we do not start twice']
def RegisterDbusSignal(self, *args, **kwargs):
"""Register signal receiver with dbus and our cleanup list."""
self.dbus_signal_receivers.append(
self.test_env.bus.add_signal_receiver(*args, **kwargs))
def CleanupDbusSignalReceivers(self):
for signal_match in self.dbus_signal_receivers:
signal_match.remove()
def RegisterForDbusSignals(self):
# Watch cromo leave the bus when it terminates and return when it
# is restarted
self.RegisterDbusSignal(self.ExceptionWrapper(self.NameOwnerChanged),
bus_name='org.freedesktop.DBus',
signal_name='NameOwnerChanged')
# Wait for cromo to report that the modem is present.
self.RegisterDbusSignal(self.ExceptionWrapper(self.ModemAdded),
bus_name='org.freedesktop.DBus',
signal_name='DeviceAdded',
dbus_interface='org.freedesktop.ModemManager')
def RegisterForUdevMonitor(self):
# have udevadm output to a pty so it will line buffer
(master, slave) = pty.openpty()
monitor = self.Popen(['udevadm', 'monitor'],
stdout=os.fdopen(slave),
bufsize=1)
glib.io_add_watch(os.fdopen(master),
glib.IO_IN | glib.IO_HUP,
self.ExceptionWrapper(self.UdevOutputReceived))
def Wait(self, timeout_seconds):
self.RegisterForDbusSignals()
self.RegisterForUdevMonitor()
gobject.timeout_add(timeout_seconds * 1000,
self.ExceptionWrapper(self.TimedOut))
# Check to see if the modem is present and remove that from the
# start preconditions if need be
self.StartIfReady()
context = gobject.MainLoop().get_context()
while self.remaining_events and not self.to_raise:
logging.info('Waiting for: ' + str(self.remaining_events))
context.iteration()
modem_utils.ClearGobiModemFaultInjection()
self.KillSubprocesses()
self.CleanupDbusSignalReceivers()
if self.to_raise:
raise self.to_raise
logging.info('Done waiting for events')
class RegularOperationTest(GobiDesyncEventLoop):
"""This covers the case where the modem makes an API call that
returns a "we've lost sync" error that should cause a reboot."""
def __init__(self, test_env):
super(RegularOperationTest, self).__init__(test_env)
def StartTest(self):
self.test_env.modem.GobiModem().InjectFault('SdkError', 12)
self.test_env.modem.SimpleModem().GetStatus()
class DataConnectTest(GobiDesyncEventLoop):
"""Test the special-case code path where we receive an error from
StartDataSession. If we're not also disabling at the same time,
this should behave the same as other desync errors."""
def __init__(self, test_env):
super(DataConnectTest, self).__init__(test_env)
def ignore(self, *args, **kwargs):
logging.info('ignoring')
pass
def StartTest(self):
gobi = self.test_env.modem.GobiModem()
gobi.InjectFault('AsyncConnectSleepMs', 1000)
gobi.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)
self.test_env.modem.SimpleModem().Connect(
{}, reply_handler=self.ignore, error_handler=self.ignore)
class ApiConnectTest(GobiDesyncEventLoop):
"""Test the special-case code on errors connecting to the API. """
def __init__(self, test_env):
super(ApiConnectTest, self).__init__(test_env)
def StartTest(self):
self.test_env.modem.Enable(False)
saw_exception = False
# Failures on API connect are a different code path
self.test_env.modem.GobiModem().InjectFault('SdkError', 1)
try:
self.test_env.modem.Enable(True)
except dbus.exceptions.DBusException:
saw_exception = True
if not saw_exception:
raise error.TestFail('Enable returned when it should have crashed')
class EnableDisableTest():
"""Test that the Enable and Disable technology functions work."""
def __init__(self, test_env):
self.test_env = test_env
def CompareModemPowerState(self, modem, expected_state):
"""Compare the power state of a modem to an expected state."""
props = modem.GetModemProperties()
state = props['Enabled']
logging.info('Modem Enabled = %s' % state)
return state == expected_state
def CompareDevicePowerState(self, device, expected_state):
"""Compare the shill device power state to an expected state."""
device_properties = device.GetProperties(utf8_strings=True);
state = device_properties['Powered']
logging.info('Device Enabled = %s' % state)
return state == expected_state
def Test(self):
"""Test that the Enable and Disable technology functions work.
The expectation is that by using enable technology shill
will change the power state of the device by requesting that
the modem manager modem be either Enabled or Disabled. The
state tracked by shill should not change until *after* the
modem state has changed. Thus after Enabling or Disabling the
technology, we wait until the shill device state changes,
and then assert that the modem state has also changed, without
having to wait again.
Raises:
error.TestFail - if the shill device or the modem manager
modem is not in the expected state
"""
device = self.test_env.shill.find_cellular_device_object()
for i in range(2):
# Enable technology, ensure that device and modem are enabled.
self.test_env.shill.manager.EnableTechnology('cellular')
utils.poll_for_condition(
lambda: self.CompareDevicePowerState(device, True),
error.TestFail('Device Failed to enter state Powered=True'))
if not self.CompareModemPowerState(self.test_env.modem, True):
raise error.TestFail('Modem Failed to enter state Enabled')
# Disable technology, ensure that device and modem are disabled.
self.test_env.shill.manager.DisableTechnology('cellular')
utils.poll_for_condition(
lambda: self.CompareDevicePowerState(device, False),
error.TestFail('Device Failed to enter state Powered=False'))
if not self.CompareModemPowerState(self.test_env.modem, False):
raise error.TestFail('Modem Failed to enter state Disabled')
class cellular_GobiRecoverFromDesync(test.test):
version = 1
def run_test(self, test_env, test):
with test_env:
try:
test()
finally:
modem_utils.ClearGobiModemFaultInjection()
def run_once(self, test_env, cycles=1, min=1, max=20):
logging.info('Testing failure during DataConnect')
self.run_test(test_env,
lambda: DataConnectTest(test_env).Wait(TEST_TIMEOUT))
logging.info('Testing failure while in regular operation')
self.run_test(test_env,
lambda: RegularOperationTest(test_env).Wait(TEST_TIMEOUT))
logging.info('Testing failure during device initialization')
self.run_test(test_env,
lambda: ApiConnectTest(test_env).Wait(TEST_TIMEOUT))
logging.info('Testing that Enable and Disable technology still work')
self.run_test(test_env,
lambda: EnableDisableTest(test_env).Test())