# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

import gobject
import logging
import time

from autotest_lib.client.bin import test
from autotest_lib.client.common_lib import error
from autotest_lib.client.cros.cellular import modem_utils
from autotest_lib.client.cros.mainloop import ExceptionForward
from autotest_lib.client.cros.mainloop import GenericTesterMainLoop
from autotest_lib.client.cros.networking import shill_proxy

DEFAULT_TEST_TIMEOUT_S = 600


class DisableTester(GenericTesterMainLoop):
  """Base class containing main test logic."""
  def __init__(self, *args, **kwargs):
    super(DisableTester, self).__init__(*args, **kwargs)

  @ExceptionForward
  def perform_one_test(self):
    """Called by GenericMainTesterMainLoop to execute the test."""
    self._configure()
    disable_delay_ms = (
        self.test_kwargs.get('delay_before_disable_ms', 0) +
        self.test.iteration *
        self.test_kwargs.get('disable_delay_per_iteration_ms', 0))
    gobject.timeout_add(disable_delay_ms, self._start_disable)
    self._start_test()

  @ExceptionForward
  def _connect_success_handler(self, *ignored_args):
    logging.info('connect succeeded')
    self.requirement_completed('connect')

  @ExceptionForward
  def _connect_error_handler(self, e):
    # We disabled while connecting; error is OK
    logging.info('connect errored: %s', e)
    self.requirement_completed('connect')

  @ExceptionForward
  def _start_disable(self):
    logging.info('disabling')
    self.disable_start = time.time()
    self._enable(False)

  @ExceptionForward
  def _disable_success_handler(self):
    disable_elapsed = time.time() - self.disable_start
    self.requirement_completed('disable')

  @ExceptionForward
  def _get_status_success_handler(self, status):
    logging.info('Got status')
    self.requirement_completed('get_status', warn_if_already_completed=False)
    if self.status_delay_ms:
      gobject.timeout_add(self.status_delay_ms, self._start_get_status)

  def after_main_loop(self):
    """Called by GenericTesterMainLoop after the main loop has exited."""
    enabled = self._enabled()
    logging.info('Modem enabled: %s', enabled)
    # Will return happily if no Gobi present
    modem_utils.ClearGobiModemFaultInjection()


class ShillDisableTester(DisableTester):
  """Tests that disable-while-connecting works at the shill level.
  Expected control flow:

  * self._configure() called; registers self._disable_property_changed
    to be called when device is en/disabled

  * Parent class sets a timer that calls self._enable(False) when it expires.

  * _start_test calls _start_connect() which sends a connect request to
    the device.

  * we wait for the modem to power off, at which point
    _disable_property_changed (registered above) will get called

  * _disable_property_changed() completes the 'disable' requirement,
    and we're done.

  """
  def __init__(self, *args, **kwargs):
    super(ShillDisableTester, self).__init__(*args, **kwargs)

  def _disable_property_changed(self, property, value, *args, **kwargs):
    self._disable_success_handler()

  def _start_test(self):
    # We would love to add requirements based on connect, but in many
    # scenarios, there is no observable response to a cancelled
    # connect: We issue a connect, it returns instantly to let us know
    # that the connect has started, but then the disable takes effect
    # and the connect fails.  We don't get a state change because no
    # state change has happened: the modem never got to a different
    # state before we cancelled
    self.remaining_requirements = set(['disable'])
    self._start_connect()

  def _configure(self):
    self.cellular_device = \
        self.test.test_env.shill.find_cellular_device_object()
    if self.cellular_device is None:
      raise error.TestError("Could not find cellular device")

    self.cellular_service = \
        self.test.test_env.shill.find_cellular_service_object()

    self.test.test_env.bus.add_signal_receiver(
            self.dispatch_property_changed,
            signal_name='PropertyChanged',
            dbus_interface=self.cellular_device.dbus_interface,
            path=self.cellular_device.object_path)

  @ExceptionForward
  def _expect_einprogress_handler(self, e):
    pass

  def _enable(self, value):
    self.property_changed_actions['Powered'] = self._disable_property_changed

    if value:
      self.cellular_device.Enable(
          reply_handler=self.ignore_handler,
          error_handler=self._expect_einprogress_handler)
    else:
      self.cellular_device.Disable(
          reply_handler=self.ignore_handler,
          error_handler=self._expect_einprogress_handler)

  @ExceptionForward
  def _start_connect(self):
    logging.info('connecting')

    def _log_connect_event(property, value, *ignored_args):
      logging.info('%s property changed: %s', property, value)

    self.property_changed_actions['Connected'] = _log_connect_event

    # Contrary to documentation, Connect just returns when it has
    # fired off the lower-level dbus messages.  So a success means
    # nothing to us.  But a failure means it didn't even try.
    self.cellular_service.Connect(
        reply_handler=self.ignore_handler,
        error_handler=self.build_error_handler('Connect'))

  def _enabled(self):
    return self.test.test_env.shill.get_dbus_property(
            self.cellular_device,
            shill_proxy.ShillProxy.DEVICE_PROPERTY_POWERED)


class ModemDisableTester(DisableTester):
  """Tests that disable-while-connecting works at the modem-manager level.

  Expected control flow:

  * _configure() is called.

  * Parent class sets a timer that calls self._enable(False) when it
    expires.

  * _start_test calls _start_connect() which sends a connect request to
    the device, also sets a timer that calls GetStatus on the modem.

  * wait for all three (connect, disable, get_status) to complete.

  """
  def __init__(self, *args, **kwargs):
    super(ModemDisableTester, self).__init__(*args, **kwargs)

  def _is_gobi(self):
    return 'Gobi' in self.test.test_env.modem.path

  def _start_test(self):
    self.remaining_requirements = set(['connect', 'disable'])

    # Only cromo/gobi-cromo-plugin maintain the invariant that GetStatus
    # will always succeed, so we only check it if the modem is a Gobi.
    if self._is_gobi():
      self.remaining_requirements.add('get_status')
      self.status_delay_ms = self.test_kwargs.get('status_delay_ms', 200)
      gobject.timeout_add(self.status_delay_ms, self._start_get_status)

    self._start_connect()

  def _configure(self):
    self.simple_modem = self.test.test_env.modem.SimpleModem()

    logging.info('Modem path: %s', self.test.test_env.modem.path)

    if self._is_gobi():
      self._configure_gobi()
    else:
      self._configure_non_gobi()

    service = self.test.test_env.shill.wait_for_cellular_service_object()
    if not service:
      raise error.TestError('Modem failed to register with the network after '
                            're-enabling.')

  def _configure_gobi(self):
    gobi_modem = self.test.test_env.modem.GobiModem()

    if 'async_connect_sleep_ms' in self.test_kwargs:
      sleep_ms = self.test_kwargs.get('async_connect_sleep_ms', 0)
      logging.info('Sleeping %d ms before connect', sleep_ms)
      gobi_modem.InjectFault('AsyncConnectSleepMs', sleep_ms)

    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
      logging.info('Injecting QMI failure')
      gobi_modem.InjectFault('ConnectFailsWithErrorSendingQmiRequest', 1)

  def _configure_non_gobi(self):
    # Check to make sure no Gobi-specific arguments were specified.
    if 'async_connect_sleep_ms' in self.test_kwargs:
      raise error.TestError('async_connect_sleep_ms on non-Gobi modem')
    if 'connect_fails_with_error_sending_qmi_request' in self.test_kwargs:
      raise error.TestError(
          'connect_fails_with_error_sending_qmi_request on non-Gobi modem')

  @ExceptionForward
  def _start_connect(self):
    logging.info('connecting')

    retval = self.simple_modem.Connect(
        {},
        reply_handler=self._connect_success_handler,
        error_handler=self._connect_error_handler)
    logging.info('connect call made.  retval = %s', retval)


  @ExceptionForward
  def _start_get_status(self):
    # Keep on calling get_status to make sure it works at all times
    self.simple_modem.GetStatus(
        reply_handler=self._get_status_success_handler,
        error_handler=self.build_error_handler('GetStatus'))

  def _enabled(self):
    return self.test.test_env.modem.GetModemProperties().get('Enabled', -1)

  def _enable(self, value):
    self.test.test_env.modem.Enable(
        value,
        reply_handler=self._disable_success_handler,
        error_handler=self.build_error_handler('Enable'))


class cellular_DisableWhileConnecting(test.test):
  """Check that the modem can handle a disconnect while connecting."""
  version = 1

  def run_once(self, test_env, **kwargs):
    self.test_env = test_env
    timeout_s = kwargs.get('timeout_s', DEFAULT_TEST_TIMEOUT_S)
    gobject_main_loop = gobject.MainLoop()

    with test_env:
      logging.info('Shill-level test')
      shill_level_test = ShillDisableTester(self,
                                            gobject_main_loop,
                                            timeout_s=timeout_s)
      shill_level_test.run(**kwargs)

    with test_env:
      try:
        logging.info('Modem-level test')
        modem_level_test = ModemDisableTester(self,
                                              gobject_main_loop,
                                              timeout_s=timeout_s)
        modem_level_test.run(**kwargs)
      finally:
        modem_utils.ClearGobiModemFaultInjection()