普通文本  |  296行  |  9.15 KB

# Copyright 2015 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""This module provides cras DBus audio utilities."""

import logging
import multiprocessing
import pprint

from autotest_lib.client.cros.audio import cras_utils


def _set_default_main_loop():
    """Sets the gobject main loop to be the event loop for DBus.

    @raises: ImportError if dbus.mainloop.glib can not be imported.

    """
    try:
        import dbus.mainloop.glib
    except ImportError, e:
        logging.exception(
                'Can not import dbus.mainloop.glib: %s. '
                'This method should only be called on Cros device.', e)
        raise
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)


def _get_gobject():
    """Tries to import gobject.

    @returns: The imported gobject module.

    @raises: ImportError if gobject can not be imported.

    """
    try:
        import gobject
    except ImportError, e:
        logging.exception(
                'Can not import gobject: %s. This method should only be '
                'called on Cros device.', e)
        raise
    return gobject


class CrasDBusMonitorError(Exception):
    """Error in CrasDBusMonitor."""
    pass


class CrasDBusMonitor(object):
    """Monitor for DBus signal from Cras."""
    def __init__(self):
        _set_default_main_loop()
        # Acquires a new Cras interface through a new dbus.SystemBus instance
        # which has default main loop.
        self._iface = cras_utils.get_cras_control_interface(private=True)
        self._loop = _get_gobject().MainLoop()
        self._count = 0


class CrasDBusSignalListener(CrasDBusMonitor):
    """Listener for certain signal."""
    def __init__(self):
        super(CrasDBusSignalListener, self).__init__()
        self._target_signal_count = 0


    def wait_for_nodes_changed(self, target_signal_count, timeout_secs):
        """Waits for NodesChanged signal.

        @param target_signal_count: The expected number of signal.
        @param timeout_secs: The timeout in seconds.

        @raises: CrasDBusMonitorError if there is no enough signals before
                 timeout.

        """
        self._target_signal_count = target_signal_count
        signal_match = self._iface.connect_to_signal(
                'NodesChanged', self._nodes_changed_handler)
        _get_gobject().timeout_add(
                timeout_secs * 1000, self._timeout_quit_main_loop)

        # Blocks here until _nodes_changed_handler or _timeout_quit_main_loop
        # quits the loop.
        self._loop.run()

        signal_match.remove()
        if self._count < self._target_signal_count:
            raise CrasDBusMonitorError('Timeout')


    def _nodes_changed_handler(self):
        """Handler for NodesChanged signal."""
        if self._loop.is_running():
            logging.debug('Got NodesChanged signal when loop is running.')
            self._count = self._count + 1
            logging.debug('count = %d', self._count)
            if self._count >= self._target_signal_count:
                logging.debug('Quit main loop')
                self._loop.quit()
        else:
            logging.debug('Got NodesChanged signal when loop is not running.'
                          ' Ignore it')


    def _timeout_quit_main_loop(self):
        """Handler for timeout in main loop.

        @returns: False so this callback will not be called again.

        """
        if self._loop.is_running():
            logging.error('Quit main loop because of timeout')
            self._loop.quit()
        else:
            logging.debug(
                    'Got _quit_main_loop after main loop quits. Ignore it')

        return False


class CrasDBusBackgroundSignalCounter(object):
    """Controls signal counter which runs in background."""
    def __init__(self):
        self._proc = None
        self._signal_name = None
        self._counter = None
        self._parent_conn = None
        self._child_conn = None


    def start(self, signal_name):
        """Starts the signal counter in a subprocess.

        @param signal_name: The name of the signal to count.

        """
        self._signal_name = signal_name
        self._parent_conn, self._child_conn = multiprocessing.Pipe()
        self._proc = multiprocessing.Process(
                target=self._run, args=(self._child_conn,))
        self._proc.daemon = True
        self._proc.start()


    def _run(self, child_conn):
        """Runs CrasDBusCounter.

        This should be called in a subprocess.
        This blocks until parent_conn send stop command to the pipe.

        """
        self._counter = CrasDBusCounter(self._signal_name, child_conn)
        self._counter.run()


    def stop(self):
        """Stops the CrasDBusCounter by sending stop command to parent_conn.

        The result of CrasDBusCounter in its subproces can be obtained by
        reading from parent_conn.

        @returns: The count of the signal of interest.

        """
        self._parent_conn.send(CrasDBusCounter.STOP_CMD)
        return self._parent_conn.recv()


class CrasDBusCounter(CrasDBusMonitor):
    """Counter for DBus signal sent from Cras"""

    _CHECK_QUIT_PERIOD_SECS = 0.1
    STOP_CMD = 'stop'

    def __init__(self, signal_name, child_conn, ignore_redundant=True):
        """Initializes a CrasDBusCounter.

        @param signal_name: The name of the signal of interest.
        @param child_conn: A multiprocessing.Pipe which is used to receive stop
                     signal and to send the counting result.
        @param ignore_redundant: Ignores signal if GetNodes result stays the
                     same. This happens when there is change in unplugged nodes,
                     which does not affect Cras client.

        """
        super(CrasDBusCounter, self).__init__()
        self._signal_name = signal_name
        self._count = None
        self._child_conn = child_conn
        self._ignore_redundant = ignore_redundant
        self._nodes = None


    def run(self):
        """Runs the gobject main loop and listens for the signal."""
        self._count = 0

        self._nodes = cras_utils.get_cras_nodes()
        logging.debug('Before starting the counter')
        logging.debug('nodes = %s', pprint.pformat(self._nodes))

        signal_match = self._iface.connect_to_signal(
                self._signal_name, self._signal_handler)
        _get_gobject().timeout_add(
                 int(self._CHECK_QUIT_PERIOD_SECS * 1000),
                 self._check_quit_main_loop)

        logging.debug('Start counting for signal %s', self._signal_name)

        # Blocks here until _check_quit_main_loop quits the loop.
        self._loop.run()

        signal_match.remove()

        logging.debug('Count result: %s', self._count)
        self._child_conn.send(self._count)


    def _signal_handler(self):
        """Handler for signal."""
        if self._loop.is_running():
            logging.debug('Got %s signal when loop is running.',
                          self._signal_name)

            logging.debug('Getting nodes.')
            nodes = cras_utils.get_cras_nodes()
            logging.debug('nodes = %s', pprint.pformat(nodes))
            if self._ignore_redundant and self._nodes == nodes:
                logging.debug('Nodes did not change. Ignore redundant signal')
                return

            self._count = self._count + 1
            logging.debug('count = %d', self._count)
        else:
            logging.debug('Got %s signal when loop is not running.'
                          ' Ignore it', self._signal_name)


    def _should_stop(self):
        """Checks if user wants to stop main loop."""
        if self._child_conn.poll():
            if self._child_conn.recv() == self.STOP_CMD:
                logging.debug('Should stop')
                return True
        return False


    def _check_quit_main_loop(self):
        """Handler for timeout in main loop.

        @returns: True so this callback will not be called again.
                  False if user quits main loop.

        """
        if self._loop.is_running():
            logging.debug('main loop is running in _check_quit_main_loop')
            if self._should_stop():
                logging.debug('Quit main loop because of stop command')
                self._loop.quit()
                return False
            else:
                logging.debug('No stop command, keep running')
                return True
        else:
            logging.debug(
                    'Got _quit_main_loop after main loop quits. Ignore it')

            return False


class CrasDBusMonitorUnexpectedNodesChanged(Exception):
    """Error for unexpected nodes changed."""
    pass


def wait_for_unexpected_nodes_changed(timeout_secs):
    """Waits for unexpected nodes changed signal in this blocking call.

    @param timeout_secs: Timeout in seconds for waiting.

    @raises CrasDBusMonitorUnexpectedNodesChanged if there is NodesChanged
            signal

    """
    try:
        CrasDBusSignalListener().wait_for_nodes_changed(1, timeout_secs)
    except CrasDBusMonitorError:
        logging.debug('There is no NodesChanged signal, as expected')
        return
    raise CrasDBusMonitorUnexpectedNodesChanged()