普通文本  |  212行  |  7.08 KB

#!/usr/bin/env python

# Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import at_channel

import fcntl
import functools
import glib
import logging
import mox
import os
import tempfile
import unittest

import task_loop

class ATChannelTestCase(unittest.TestCase):
    """
    Test fixture for ATChannel class.

    """

    def setUp(self):
        self.mox = mox.Mox()

        master, slave = os.openpty()
        self._at_channel = at_channel.ATChannel(
                self._recieve_command_local_callback, slave, 'test')

        # Replace the channel inside _at_channel with a tempfile
        # We will use the tempfile to simulate a tty pair.
        os.close(master)
        os.close(slave)
        self._channel_file = tempfile.TemporaryFile(mode = 'w+')
        # These properties are a copy of the properties set in ATChannel for the
        # tty pair.
        flags = fcntl.fcntl(self._channel_file.fileno(), fcntl.F_GETFL)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(self._channel_file.fileno(), fcntl.F_SETFL, flags)
        self._at_channel._channel = self._channel_file.fileno()
        # We need to seek() to the beginning of the file to simulate tty read.
        # So remember the head of the file.
        self._channel_file_head = self._channel_file.tell()

        # Also mock out the task_loop
        self._mox_task_loop = self.mox.CreateMock(task_loop.TaskLoop)
        self._at_channel._task_loop = self._mox_task_loop


    def tearDown(self):
        self._channel_file.close()

    # ##########################################################################
    # Tests

    def test_successful_send(self):
        """
        Test that a single AT command can be sent on the channel.

        """
        payload = 'A not so huge AT+CEREG command.'
        self._at_channel.send(payload)
        received_command = self._recieve_command_remote()
        self.assertTrue(received_command.endswith('\r\n'))
        self.assertEqual(payload.strip(), received_command.strip())

        # Change the AT command guard strings and check again.
        self._at_channel.at_prefix = '$$'
        self._at_channel.at_suffix = '##'
        payload = 'A not so huge AT+CEREG command.'
        self._at_channel.send(payload)
        received_command = self._recieve_command_remote()
        self.assertTrue(received_command.startswith('$$'))
        self.assertTrue(received_command.endswith('##'))
        self.assertEqual(payload.strip(),
                         received_command.strip('$$').strip('##'))


    def test_recieve_single_at_command(self):
        """
        Test that a single AT command can be received together on the channel.

        """
        payload = 'We send you our AT+good wishes too!\r\n'
        callback = lambda channel, payload: None
        self._at_channel._receiver_callback = callback
        self._mox_task_loop.post_task(callback, payload.strip())
        self.mox.ReplayAll()
        self._send_command_remote(payload)
        self._at_channel._handle_channel_cb(self._channel_file.fileno(),
                                          glib.IO_IN)
        self.mox.VerifyAll()


    def test_receive_at_commands_differet_terminators(self):
        """
        Test that AT commands are recieved correctly when different supported
        termination strings are being used.

        """
        # ; is a continuation marker. AT1;2 == AT1\r\nAT2
        payloads = ['AT1\r\nA', 'T2\rA', 'T3\nA', 'T4;', '5\r\n']
        callback = lambda channel, payload: None
        self._at_channel._receiver_callback = callback
        self._mox_task_loop.post_task(callback, 'AT1')
        self._mox_task_loop.post_task(callback, 'AT2')
        self._mox_task_loop.post_task(callback, 'AT3')
        self._mox_task_loop.post_task(callback, 'AT4')
        self._mox_task_loop.post_task(callback, 'AT5')

        self.mox.ReplayAll()
        for payload in payloads:
            self._send_command_remote(payload)
            self._at_channel._handle_channel_cb(self._channel_file.fileno(),
                                                glib.IO_IN)
        self.mox.VerifyAll()


    def test_recieve_at_commands_in_parts(self):
        """
        Test that a multiple AT commands can be received in parts on the
        channel.

        """
        payloads = ['AT1', '11\r\n', '\r\nAT22', '2\r\nAT333', '\r\n']
        callback = lambda channel, payload: None
        self._at_channel._receiver_callback = callback
        self._mox_task_loop.post_task(callback, 'AT111')
        self._mox_task_loop.post_task(callback, 'AT222')
        self._mox_task_loop.post_task(callback, 'AT333')

        self.mox.ReplayAll()
        for payload in payloads:
            self._send_command_remote(payload)
            self._at_channel._handle_channel_cb(self._channel_file.fileno(),
                                                glib.IO_IN)
        self.mox.VerifyAll()


    def test_recieve_long_at_commands(self):
        """
        Test that a multiple AT commands can be received in parts on the
        channel.

        """
        payloads = ['AT1+',
                    '123456789\r\nAT2+123456789\r\nAT3+1234567',
                    '89\r\n']
        callback = lambda channel, payload: None
        self._at_channel._receiver_callback = callback
        self._mox_task_loop.post_task(callback, 'AT1+123456789')
        self._mox_task_loop.post_task(callback, 'AT2+123456789')
        self._mox_task_loop.post_task(callback, 'AT3+123456789')

        self.mox.ReplayAll()
        at_channel.CHANNEL_READ_CHUNK_SIZE = 4
        for payload in payloads:
            self._send_command_remote(payload)
            self._at_channel._handle_channel_cb(self._channel_file.fileno(),
                                                glib.IO_IN)
        self.mox.VerifyAll()

    # ##########################################################################
    # Helper functions

    def _clean_channel_file(self):
        """
        Clean the tempfile used to simulate tty, and reset the r/w head.

        """
        self._channel_file.truncate(0)
        self._channel_file_head = self._channel_file.tell()


    def _send_command_remote(self, payload):
        """
        Simulate a command being sent from the remote tty port.

        @param payload: The command to send.

        """
        self._clean_channel_file()
        self._channel_file.write(payload)
        self._channel_file.flush()
        self._channel_file.seek(self._channel_file_head)


    def _recieve_command_remote(self):
        """
        Simluate a command being received at the remote tty port.

        """
        self._channel_file.flush()
        self._channel_file.seek(self._channel_file_head)
        payload_list = []
        for buf in iter(functools.partial(self._channel_file.read, 128), ''):
            payload_list.append(buf)
        self._clean_channel_file()
        return ''.join(payload_list)


    def _recieve_command_local_callback(self, payload):
        pass


if __name__ == '__main__':
    logging.basicConfig(level=logging.DEBUG)
    unittest.main()