普通文本  |  170行  |  6.42 KB

// Copyright 2015 The Weave Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "src/commands/cloud_command_proxy.h"

#include <base/bind.h>
#include <weave/enum_to_string.h>
#include <weave/provider/task_runner.h>

#include "src/commands/command_instance.h"
#include "src/commands/schema_constants.h"
#include "src/utils.h"

namespace weave {

CloudCommandProxy::CloudCommandProxy(
    CommandInstance* command_instance,
    CloudCommandUpdateInterface* cloud_command_updater,
    ComponentManager* component_manager,
    std::unique_ptr<BackoffEntry> backoff_entry,
    provider::TaskRunner* task_runner)
    : command_instance_{command_instance},
      cloud_command_updater_{cloud_command_updater},
      component_manager_{component_manager},
      task_runner_{task_runner},
      cloud_backoff_entry_{std::move(backoff_entry)} {
  callback_token_ = component_manager_->AddServerStateUpdatedCallback(
      base::Bind(&CloudCommandProxy::OnDeviceStateUpdated,
                 weak_ptr_factory_.GetWeakPtr()));
  observer_.Add(command_instance);
}

void CloudCommandProxy::OnErrorChanged() {
  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
  patch->Set(commands::attributes::kCommand_Error,
             command_instance_->GetError()
                 ? ErrorInfoToJson(*command_instance_->GetError()).release()
                 : base::Value::CreateNullValue().release());
  QueueCommandUpdate(std::move(patch));
}

void CloudCommandProxy::OnResultsChanged() {
  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
  patch->Set(commands::attributes::kCommand_Results,
             command_instance_->GetResults().CreateDeepCopy());
  QueueCommandUpdate(std::move(patch));
}

void CloudCommandProxy::OnStateChanged() {
  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
  patch->SetString(commands::attributes::kCommand_State,
                   EnumToString(command_instance_->GetState()));
  QueueCommandUpdate(std::move(patch));
}

void CloudCommandProxy::OnProgressChanged() {
  std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue};
  patch->Set(commands::attributes::kCommand_Progress,
             command_instance_->GetProgress().CreateDeepCopy());
  QueueCommandUpdate(std::move(patch));
}

void CloudCommandProxy::OnCommandDestroyed() {
  delete this;
}

void CloudCommandProxy::QueueCommandUpdate(
    std::unique_ptr<base::DictionaryValue> patch) {
  ComponentManager::UpdateID id = component_manager_->GetLastStateChangeId();
  if (update_queue_.empty() || update_queue_.back().first != id) {
    // If queue is currently empty or the device state has changed since the
    // last patch request queued, add a new request to the queue.
    update_queue_.push_back(std::make_pair(id, std::move(patch)));
  } else {
    // Device state hasn't changed since the last time this command update
    // was queued. We can coalesce the command update patches, unless the
    // current request is already in flight to the server.
    if (update_queue_.size() == 1 && command_update_in_progress_) {
      // Can't update the request which is being sent to the server.
      // Queue a new update.
      update_queue_.push_back(std::make_pair(id, std::move(patch)));
    } else {
      // Coalesce the patches.
      update_queue_.back().second->MergeDictionary(patch.get());
    }
  }
  // Send out an update request to the server, if needed.

  // Post to accumulate more changes during the current message loop task run.
  task_runner_->PostDelayedTask(
      FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
                            backoff_weak_ptr_factory_.GetWeakPtr()),
      {});
}

void CloudCommandProxy::SendCommandUpdate() {
  if (command_update_in_progress_ || update_queue_.empty())
    return;

  // Check if we have any pending updates ready to be sent to the server.
  // We can only send updates for which the device state at the time the
  // requests have been queued were successfully propagated to the server.
  // That is, if the pending device state updates that we recorded while the
  // command update was queued haven't been acknowledged by the server, we
  // will hold the corresponding command updates until the related device state
  // has been successfully updated on the server.
  if (update_queue_.front().first > last_state_update_id_)
    return;

  backoff_weak_ptr_factory_.InvalidateWeakPtrs();
  if (cloud_backoff_entry_->ShouldRejectRequest()) {
    VLOG(1) << "Cloud request delayed for "
            << cloud_backoff_entry_->GetTimeUntilRelease()
            << " due to backoff policy";
    task_runner_->PostDelayedTask(
        FROM_HERE, base::Bind(&CloudCommandProxy::SendCommandUpdate,
                              backoff_weak_ptr_factory_.GetWeakPtr()),
        cloud_backoff_entry_->GetTimeUntilRelease());
    return;
  }

  // Coalesce any pending updates that were queued prior to the current device
  // state known to be propagated to the server successfully.
  auto iter = update_queue_.begin();
  auto start = ++iter;
  while (iter != update_queue_.end()) {
    if (iter->first > last_state_update_id_)
      break;
    update_queue_.front().first = iter->first;
    update_queue_.front().second->MergeDictionary(iter->second.get());
    ++iter;
  }
  // Remove all the intermediate items that have been merged into the first
  // entry.
  update_queue_.erase(start, iter);
  command_update_in_progress_ = true;
  cloud_command_updater_->UpdateCommand(
      command_instance_->GetID(), *update_queue_.front().second,
      base::Bind(&CloudCommandProxy::OnUpdateCommandDone,
                 weak_ptr_factory_.GetWeakPtr()));
}

void CloudCommandProxy::ResendCommandUpdate() {
  command_update_in_progress_ = false;
  SendCommandUpdate();
}

void CloudCommandProxy::OnUpdateCommandDone(ErrorPtr error) {
  command_update_in_progress_ = false;
  cloud_backoff_entry_->InformOfRequest(!error);
  if (!error) {
    // Remove the succeeded update from the queue.
    update_queue_.pop_front();
  }
  // If we have more pending updates, send a new request to the server
  // immediately, if possible.
  SendCommandUpdate();
}

void CloudCommandProxy::OnDeviceStateUpdated(
    ComponentManager::UpdateID update_id) {
  last_state_update_id_ = update_id;
  // Try to send out any queued command updates that could be performed after
  // a device state is updated.
  SendCommandUpdate();
}

}  // namespace weave