普通文本  |  144行  |  6.3 KB

# Copyright 2015 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import logging

import common
from autotest_lib.client.bin import utils
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_channel
from autotest_lib.client.cros.cellular.mbim_compliance \
        import mbim_command_message
from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
from autotest_lib.client.cros.cellular.mbim_compliance \
        import mbim_message_request
from autotest_lib.client.cros.cellular.mbim_compliance \
        import mbim_message_response
from autotest_lib.client.cros.cellular.mbim_compliance \
        import mbim_test_base
from autotest_lib.client.cros.cellular.mbim_compliance.sequences \
        import get_descriptors_sequence
from autotest_lib.client.cros.cellular.mbim_compliance.sequences \
        import mbim_open_generic_sequence


class cellular_MbimComplianceCM16(mbim_test_base.MbimTestBase):
    """
    CM_16 Validation of fragmented message transmission in case of multiple
    fragmented messages.

    This test verifies that fragmented messages sent from the function are not
    intermixed. Note that this test is only applicable for devices that support
    multiple outstanding commands.

    Reference:
        [1] Universal Serial Bus Communication Class MBIM Compliance Testing: 44
        http://www.usb.org/developers/docs/devclass_docs/MBIM-Compliance-1.0.pdf
    """
    version = 1

    def run_internal(self):
        """ Run CM_16 test. """
        # Precondition
        desc_sequence = get_descriptors_sequence.GetDescriptorsSequence(
                self.device_context)
        descriptors = desc_sequence.run()
        self.device_context.update_descriptor_cache(descriptors)
        open_sequence = mbim_open_generic_sequence.MBIMOpenGenericSequence(
                self.device_context)
        open_sequence.run(max_control_transfer_size=64)

        device_context = self.device_context
        descriptor_cache = device_context.descriptor_cache
        self.channel = mbim_channel.MBIMChannel(
                device_context.device,
                descriptor_cache.mbim_communication_interface.bInterfaceNumber,
                descriptor_cache.interrupt_endpoint.bEndpointAddress,
                device_context.max_control_transfer_size)

        # Step 1
        caps_command_message = mbim_command_message.MBIMDeviceCapsQuery()
        caps_packets = mbim_message_request.generate_request_packets(
                caps_command_message,
                device_context.max_control_transfer_size)
        self.caps_transaction_id = caps_command_message.transaction_id

        # Step 2
        services_command_message = (
                mbim_command_message.MBIMDeviceServicesQuery())
        services_packets = mbim_message_request.generate_request_packets(
                services_command_message,
                device_context.max_control_transfer_size)
        self.services_transaction_id = services_command_message.transaction_id

        # Transmit the messages now
        self.channel.unidirectional_transaction(*caps_packets)
        self.channel.unidirectional_transaction(*services_packets)

        # Step 3
        utils.poll_for_condition(
                self._get_response_packets,
                timeout=5,
                exception=mbim_errors.MBIMComplianceChannelError(
                        'Failed to retrieve the response packets to specific '
                        'control messages.'))
        self.channel.close()

        caps_response_message = self.caps_response
        services_response_message = self.services_response
        is_caps_message_valid = isinstance(
                caps_response_message,
                mbim_command_message.MBIMDeviceCapsInfo)
        is_services_message_valid = isinstance(
                services_response_message,
                mbim_command_message.MBIMDeviceServicesInfo)
        if not ((is_caps_message_valid and is_services_message_valid) and
                (caps_response_message.transaction_id ==
                 caps_command_message.transaction_id) and
                (caps_response_message.device_service_id ==
                 caps_command_message.device_service_id) and
                caps_response_message.cid == caps_command_message.cid and
                (services_command_message.transaction_id ==
                 services_response_message.transaction_id) and
                (services_command_message.device_service_id ==
                 services_response_message.device_service_id) and
                services_command_message.cid == services_response_message.cid):
            mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError,
                                      'mbim1.0:9.5#1')


    def _get_response_packets(self):
        """
        Condition method for |poll_for_condition| to check the retrieval of
        target packets.

        @returns True if both caps response packet and services response packet
                are received, False otherwise.

        """
        try:
            packets = self.channel.get_outstanding_packets()
        except mbim_errors.MBIMComplianceChannelError:
            logging.debug("Error in receiving response fragments from the device")
            mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError,
                                      'mbim1.0:9.5#1')
        self.caps_response = None
        self.services_response = None
        for packet in packets:
            try:
                message_response = mbim_message_response.parse_response_packets(
                        packet)
            except mbim_errors.MBIMComplianceControlMessageError:
                logging.debug("Error in parsing response fragments from the device")
                mbim_errors.log_and_raise(
                        mbim_errors.MBIMComplianceAssertionError,
                        'mbim1.0:9.5#1')
            if message_response.transaction_id == self.caps_transaction_id:
                self.caps_response = message_response
            elif (message_response.transaction_id ==
                  self.services_transaction_id):
                self.services_response = message_response
            if self.caps_response and self.services_response:
                return True
        return False