普通文本  |  359行  |  15.19 KB

# Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import dbus
import logging
import os
import time

from autotest_lib.client.bin import test
from autotest_lib.client.bin import utils
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.cellular import mm1_constants
from autotest_lib.client.cros.cellular.pseudomodem import pm_constants
from autotest_lib.client.cros.cellular.pseudomodem import pseudomodem_context
from autotest_lib.client.cros.networking import cellular_proxy

# Used for software message propagation latencies.
SHORT_TIMEOUT_SECONDS = 2
STATE_MACHINE_SCAN = 'ScanMachine'
TEST_MODEMS_MODULE_PATH = os.path.join(os.path.dirname(__file__), 'files',
                                       'modems.py')

class cellular_ScanningProperty(test.test):
    """
    Test that the |Scanning| Property of the shill cellular device object is
    updated correctly in the following two scenarios:
      (1) When a user requests a network scan using the |ProposeScan| method of
          the cellular device.
      (2) During the initial modem enable-register-connect sequence.

    """
    version = 1

    def _find_mm_modem(self):
        """
        Find the modemmanager modem object.

        Assumption: There is only one modem in the system.

        @raises: TestError unless exactly one modem is found.

        """
        object_manager = dbus.Interface(
                self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
                                     mm1_constants.MM1),
                mm1_constants.I_OBJECT_MANAGER)
        try:
            modems = object_manager.GetManagedObjects()
        except dbus.exceptions.DBusException as e:
            raise error.TestFail('Failed to list the available modems. '
                                 'DBus error: |%s|', repr(e))
        if len(modems) != 1:
            raise error.TestFail('Expected one modem object, found %d' %
                                 len(modems))

        modem_path = modems.keys()[0]
        modem_object = self._bus.get_object(mm1_constants.I_MODEM_MANAGER,
                                            modem_path)
        # Check that this object is valid
        try:
            modem_object.GetAll(mm1_constants.I_MODEM,
                                dbus_interface=mm1_constants.I_PROPERTIES)
        except dbus.exceptions.DBusException as e:
            raise error.TestFail('Failed to obtain dbus object for the modem '
                                 'DBus error: |%s|', repr(e))

        return dbus.Interface(modem_object, mm1_constants.I_MODEM)


    def _check_mm_state(self, modem, states):
        """
        Verify that the modemmanager state is |state|.

        @param modem: A DBus object for the modemmanager modem.
        @param states: The expected state of the modem. This is either a single
                state, or a list of states.
        @raises: TestError if the state differs.
        """
        if not isinstance(states, list):
            states = [states]
        properties = modem.GetAll(mm1_constants.I_MODEM,
                                  dbus_interface=mm1_constants.I_PROPERTIES)
        actual_state = properties[mm1_constants.MM_MODEM_PROPERTY_NAME_STATE]
        if actual_state not in states:
            state_names = [mm1_constants.ModemStateToString(x) for x in states]
            raise error.TestFail(
                    'Expected modemmanager modem state to be one of %s but '
                    'found %s' %
                    (state_names,
                     mm1_constants.ModemStateToString(actual_state)))


    def _check_shill_property_update(self, cellular_device, property_name,
                                     old_state, new_state):
        """
        Check the value of property of shill.

        @param cellular_device: The DBus proxy object for the cellular device.
        @param property_name: Name of the property to check.
        @param old_state: old value of property.
        @param new_state: new expected value of property.
        @raises: TestError if the property fails to enter the given state.

        """
        # If we don't expect a change in the value, there is a race between this
        # check and a possible (erronous) update of the value. Allow some time
        # for the property to be updated before checking.
        if old_state == new_state:
            time.sleep(SHORT_TIMEOUT_SECONDS)
            polling_timeout = 0
        else:
            polling_timeout = SHORT_TIMEOUT_SECONDS
        success, _, _ = self._cellular_proxy.wait_for_property_in(
                cellular_device,
                property_name,
                (new_state,),
                timeout_seconds=polling_timeout)
        if not success:
            raise error.TestFail('Shill failed to set |%s| to %s.' %
                                 (property_name, str(new_state)))


    def _itesting_machine(self, machine_name, timeout=SHORT_TIMEOUT_SECONDS):
        """
        Get the testing interface of the given interactive state machine.

        @param machine_name: The name of the interactive state machine.
        @return dbus.Interface for the testing interface of
                InteractiveScanningMachine, if found. None otherwise.
        @raises utils.TimeoutError if a valid dbus object can't be found.

        """
        def _get_machine():
            machine = self._bus.get_object(
                    mm1_constants.I_MODEM_MANAGER,
                    '/'.join([pm_constants.TESTING_PATH, machine_name]))
            if machine:
                i_machine = dbus.Interface(machine, pm_constants.I_TESTING_ISM)
                # Only way to know if this DBus object is valid is to call a
                # method on it.
                try:
                    i_machine.IsWaiting()  # Ignore result.
                    return i_machine
                except dbus.exceptions.DBusException as e:
                    logging.debug(e)
                    return None

        utils.poll_for_condition(_get_machine, timeout=timeout)
        return _get_machine()


    def test_user_initiated_cellular_scan(self):
        """
        Test that the |ProposeScan| DBus method exported by shill cellular
        object correctly updates the cellular object |Scanning| property while
        the scan is in progress.
        """
        with pseudomodem_context.PseudoModemManagerContext(
                True,
                {'test-module' : TEST_MODEMS_MODULE_PATH,
                 'test-modem-class' : 'AsyncScanModem'}):
            self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy()
            self._bus = dbus.SystemBus()
            self._cellular_proxy.set_logging_for_cellular_test()

            logging.info('Sanity check initial values')
            utils.poll_for_condition(
                    self._cellular_proxy.find_cellular_device_object,
                    exception=error.TestFail(
                            'Bad initial state: Failed to obtain a cellular '
                            'device in pseudomodem context.'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            device = self._cellular_proxy.find_cellular_device_object()
            try:
                self._itesting_machine(STATE_MACHINE_SCAN, 0)
                raise error.TestFail('Bad initial state: scan machine created '
                                     'by pseudomodem before scan is proposed.')
            except utils.TimeoutError:
                pass

            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    False,
                    False)

            logging.info('Test actions and checks')
            device.ProposeScan()
            try:
                itesting_scan_machine = self._itesting_machine(
                        STATE_MACHINE_SCAN)
            except utils.TimeoutError:
                raise error.TestFail('Pseudomodem failed to launch %s' %
                                     STATE_MACHINE_SCAN)
            utils.poll_for_condition(
                    itesting_scan_machine.IsWaiting,
                    exception=error.TestFail('Scan machine failed to enter '
                                             'scan state'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    False,
                    True)

            itesting_scan_machine.Advance()
            utils.poll_for_condition(
                    lambda: not itesting_scan_machine.IsWaiting(),
                    exception=error.TestFail('Scan machine failed to exit '
                                             'scan state'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    True,
                    False)


    def test_activated_service_states(self):
        """
        Test that shill |Scanning| property is updated correctly when an
        activated 3GPP service connects.
        """
        with pseudomodem_context.PseudoModemManagerContext(
                True,
                {'test-module' : TEST_MODEMS_MODULE_PATH,
                 'test-state-machine-factory-class' :
                        'InteractiveStateMachineFactory'}):
            self._cellular_proxy = cellular_proxy.CellularProxy.get_proxy()
            self._bus = dbus.SystemBus()
            self._cellular_proxy.set_logging_for_cellular_test()

            logging.info('Sanity check initial values')
            enable_machine = self._itesting_machine(
                    pm_constants.STATE_MACHINE_ENABLE)
            utils.poll_for_condition(
                    enable_machine.IsWaiting,
                    exception=error.TestFail(
                            'Bad initial state: Pseudomodem did not launch '
                            'Enable machine'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            utils.poll_for_condition(
                    self._cellular_proxy.find_cellular_device_object,
                    exception=error.TestFail(
                            'Bad initial state: Failed to obtain a cellular '
                            'device in pseudomodem context.'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            device = self._cellular_proxy.find_cellular_device_object()
            mm_modem = self._find_mm_modem()

            logging.info('Test Connect sequence')
            self._check_mm_state(mm_modem,
                                 mm1_constants.MM_MODEM_STATE_DISABLED)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_POWERED,
                    False,
                    False)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    False,
                    False)
            logging.info('Expectation met: |Scanning| is False in MM state '
                         'Disabled')
            enable_machine.Advance()

            # MM state: Enabling
            utils.poll_for_condition(
                    enable_machine.IsWaiting,
                    exception=error.TestFail('EnableMachine failed to wait in '
                                             'Enabling state'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            self._check_mm_state(mm_modem,
                                 mm1_constants.MM_MODEM_STATE_ENABLING)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    False,
                    True)
            logging.info('Expectation met: |Scanning| is True in MM state '
                         'Enabling')
            enable_machine.Advance()

            # MM state: Enabled
            utils.poll_for_condition(
                    enable_machine.IsWaiting,
                    exception=error.TestFail('EnableMachine failed to wait in '
                                             'Enabled state'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            # Finish the enable call.
            enable_machine.Advance()

            self._check_mm_state(mm_modem, mm1_constants.MM_MODEM_STATE_ENABLED)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_POWERED,
                    False,
                    True)
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    True,
                    True)

            register_machine = self._itesting_machine(
                    pm_constants.STATE_MACHINE_REGISTER)
            utils.poll_for_condition(
                    register_machine.IsWaiting,
                    exception=error.TestFail('SearchingMachine failed to wait '
                                             'in Enabled state'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            logging.info('Expectation met: |Scanning| is True in MM state '
                         'Enabled')
            register_machine.Advance()

            # MM state: Searching
            utils.poll_for_condition(
                    register_machine.IsWaiting,
                    exception=error.TestFail('SearchingMachine failed to wait '
                                             'in Searching state'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            self._check_mm_state(mm_modem,
                                 mm1_constants.MM_MODEM_STATE_SEARCHING)
            enable_machine.Advance()
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    True,
                    True)
            logging.info('Expectation met: |Scanning| is True in MM state '
                         'Searching')
            register_machine.Advance()

            # MM state: >= Registered
            utils.poll_for_condition(
                    self._cellular_proxy.find_cellular_service_object,
                    error.TestFail('Failed to create Cellular Service for a '
                                   'registered modem'),
                    timeout=SHORT_TIMEOUT_SECONDS)
            self._check_mm_state(mm_modem,
                                 [mm1_constants.MM_MODEM_STATE_REGISTERED,
                                  mm1_constants.MM_MODEM_STATE_CONNECTING,
                                  mm1_constants.MM_MODEM_STATE_CONNECTED])
            self._check_shill_property_update(
                    device,
                    self._cellular_proxy.DEVICE_PROPERTY_SCANNING,
                    True,
                    False)
            logging.info('Expectation met: |Scanning| is False in MM state '
                         'Registered')


    def run_once(self):
        """ Autotest entry function """
        self.test_user_initiated_cellular_scan()
        self.test_activated_service_states()