// Copyright 2015 The Weave Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "src/commands/command_queue.h" #include <base/bind.h> #include <base/time/time.h> namespace weave { namespace { const int kRemoveCommandDelayMin = 5; std::string GetCommandHandlerKey(const std::string& component_path, const std::string& command_name) { return component_path + ":" + command_name; } } CommandQueue::CommandQueue(provider::TaskRunner* task_runner, base::Clock* clock) : task_runner_{task_runner}, clock_{clock} {} void CommandQueue::AddCommandAddedCallback(const CommandCallback& callback) { on_command_added_.push_back(callback); // Send all pre-existed commands. for (const auto& command : map_) callback.Run(command.second.get()); } void CommandQueue::AddCommandRemovedCallback(const CommandCallback& callback) { on_command_removed_.push_back(callback); } void CommandQueue::AddCommandHandler( const std::string& component_path, const std::string& command_name, const Device::CommandHandlerCallback& callback) { if (!command_name.empty()) { CHECK(default_command_callback_.is_null()) << "Commands specific handler are not allowed after default one"; for (const auto& command : map_) { if (command.second->GetState() == Command::State::kQueued && command.second->GetName() == command_name && command.second->GetComponent() == component_path) { callback.Run(command.second); } } std::string key = GetCommandHandlerKey(component_path, command_name); CHECK(command_callbacks_.insert(std::make_pair(key, callback)).second) << command_name << " already has handler"; } else { CHECK(component_path.empty()) << "Default handler must not be component-specific"; for (const auto& command : map_) { std::string key = GetCommandHandlerKey(command.second->GetComponent(), command.second->GetName()); if (command.second->GetState() == Command::State::kQueued && command_callbacks_.find(key) == command_callbacks_.end()) { callback.Run(command.second); } } CHECK(default_command_callback_.is_null()) << "Already has default handler"; default_command_callback_ = callback; } } void CommandQueue::Add(std::unique_ptr<CommandInstance> instance) { std::string id = instance->GetID(); LOG_IF(FATAL, id.empty()) << "Command has no ID"; instance->AttachToQueue(this); auto pair = map_.insert(std::make_pair(id, std::move(instance))); LOG_IF(FATAL, !pair.second) << "Command with ID '" << id << "' is already in the queue"; for (const auto& cb : on_command_added_) cb.Run(pair.first->second.get()); std::string key = GetCommandHandlerKey(pair.first->second->GetComponent(), pair.first->second->GetName()); auto it_handler = command_callbacks_.find(key); if (it_handler != command_callbacks_.end()) it_handler->second.Run(pair.first->second); else if (!default_command_callback_.is_null()) default_command_callback_.Run(pair.first->second); } void CommandQueue::RemoveLater(const std::string& id) { auto p = map_.find(id); if (p == map_.end()) return; auto remove_delay = base::TimeDelta::FromMinutes(kRemoveCommandDelayMin); remove_queue_.push(std::make_pair(clock_->Now() + remove_delay, id)); if (remove_queue_.size() == 1) { // The queue was empty, this is the first command to be removed, schedule // a clean-up task. ScheduleCleanup(remove_delay); } } bool CommandQueue::Remove(const std::string& id) { auto p = map_.find(id); if (p == map_.end()) return false; std::shared_ptr<CommandInstance> instance = p->second; instance->DetachFromQueue(); map_.erase(p); for (const auto& cb : on_command_removed_) cb.Run(instance.get()); return true; } void CommandQueue::Cleanup(const base::Time& cutoff_time) { while (!remove_queue_.empty() && remove_queue_.top().first <= cutoff_time) { Remove(remove_queue_.top().second); remove_queue_.pop(); } } void CommandQueue::ScheduleCleanup(base::TimeDelta delay) { task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&CommandQueue::PerformScheduledCleanup, weak_ptr_factory_.GetWeakPtr()), delay); } void CommandQueue::PerformScheduledCleanup() { base::Time now = clock_->Now(); Cleanup(now); if (!remove_queue_.empty()) ScheduleCleanup(remove_queue_.top().first - now); } CommandInstance* CommandQueue::Find(const std::string& id) const { auto p = map_.find(id); return (p != map_.end()) ? p->second.get() : nullptr; } } // namespace weave