// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/common/handle_watcher.h"
#include <map>
#include "base/atomic_sequence_num.h"
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/message_loop/message_loop_proxy.h"
#include "base/threading/thread.h"
#include "base/time/tick_clock.h"
#include "base/time/time.h"
#include "mojo/common/message_pump_mojo.h"
#include "mojo/common/message_pump_mojo_handler.h"
namespace mojo {
namespace common {
typedef int WatcherID;
namespace {
const char kWatcherThreadName[] = "handle-watcher-thread";
// TODO(sky): this should be unnecessary once MessageLoop has been refactored.
MessagePumpMojo* message_pump_mojo = NULL;
scoped_ptr<base::MessagePump> CreateMessagePumpMojo() {
message_pump_mojo = new MessagePumpMojo;
return scoped_ptr<base::MessagePump>(message_pump_mojo).Pass();
}
// Tracks the data for a single call to Start().
struct WatchData {
WatchData()
: id(0),
wait_flags(MOJO_WAIT_FLAG_NONE),
message_loop(NULL) {}
WatcherID id;
Handle handle;
MojoWaitFlags wait_flags;
base::TimeTicks deadline;
base::Callback<void(MojoResult)> callback;
scoped_refptr<base::MessageLoopProxy> message_loop;
};
// WatcherBackend --------------------------------------------------------------
// WatcherBackend is responsible for managing the requests and interacting with
// MessagePumpMojo. All access (outside of creation/destruction) is done on the
// thread WatcherThreadManager creates.
class WatcherBackend : public MessagePumpMojoHandler {
public:
WatcherBackend();
virtual ~WatcherBackend();
void StartWatching(const WatchData& data);
void StopWatching(WatcherID watcher_id);
private:
typedef std::map<Handle, WatchData> HandleToWatchDataMap;
// Invoked when a handle needs to be removed and notified.
void RemoveAndNotify(const Handle& handle, MojoResult result);
// Searches through |handle_to_data_| for |watcher_id|. Returns true if found
// and sets |handle| to the Handle. Returns false if not a known id.
bool GetMojoHandleByWatcherID(WatcherID watcher_id, Handle* handle) const;
// MessagePumpMojoHandler overrides:
virtual void OnHandleReady(const Handle& handle) OVERRIDE;
virtual void OnHandleError(const Handle& handle, MojoResult result) OVERRIDE;
// Maps from assigned id to WatchData.
HandleToWatchDataMap handle_to_data_;
DISALLOW_COPY_AND_ASSIGN(WatcherBackend);
};
WatcherBackend::WatcherBackend() {
}
WatcherBackend::~WatcherBackend() {
}
void WatcherBackend::StartWatching(const WatchData& data) {
RemoveAndNotify(data.handle, MOJO_RESULT_CANCELLED);
DCHECK_EQ(0u, handle_to_data_.count(data.handle));
handle_to_data_[data.handle] = data;
message_pump_mojo->AddHandler(this, data.handle,
data.wait_flags,
data.deadline);
}
void WatcherBackend::StopWatching(WatcherID watcher_id) {
// Because of the thread hop it is entirely possible to get here and not
// have a valid handle registered for |watcher_id|.
Handle handle;
if (!GetMojoHandleByWatcherID(watcher_id, &handle))
return;
handle_to_data_.erase(handle);
message_pump_mojo->RemoveHandler(handle);
}
void WatcherBackend::RemoveAndNotify(const Handle& handle,
MojoResult result) {
if (handle_to_data_.count(handle) == 0)
return;
const WatchData data(handle_to_data_[handle]);
handle_to_data_.erase(handle);
message_pump_mojo->RemoveHandler(handle);
data.message_loop->PostTask(FROM_HERE, base::Bind(data.callback, result));
}
bool WatcherBackend::GetMojoHandleByWatcherID(WatcherID watcher_id,
Handle* handle) const {
for (HandleToWatchDataMap::const_iterator i = handle_to_data_.begin();
i != handle_to_data_.end(); ++i) {
if (i->second.id == watcher_id) {
*handle = i->second.handle;
return true;
}
}
return false;
}
void WatcherBackend::OnHandleReady(const Handle& handle) {
RemoveAndNotify(handle, MOJO_RESULT_OK);
}
void WatcherBackend::OnHandleError(const Handle& handle, MojoResult result) {
RemoveAndNotify(handle, result);
}
// WatcherThreadManager --------------------------------------------------------
// WatcherThreadManager manages the background thread that listens for handles
// to be ready. All requests are handled by WatcherBackend.
class WatcherThreadManager {
public:
// Returns the shared instance.
static WatcherThreadManager* GetInstance();
// Starts watching the requested handle. Returns a unique ID that is used to
// stop watching the handle. When the handle is ready |callback| is notified
// on the thread StartWatching() was invoked on.
// This may be invoked on any thread.
WatcherID StartWatching(const Handle& handle,
MojoWaitFlags wait_flags,
base::TimeTicks deadline,
const base::Callback<void(MojoResult)>& callback);
// Stops watching a handle.
// This may be invoked on any thread.
void StopWatching(WatcherID watcher_id);
private:
friend struct base::DefaultLazyInstanceTraits<WatcherThreadManager>;
WatcherThreadManager();
~WatcherThreadManager();
base::Thread thread_;
base::AtomicSequenceNumber watcher_id_generator_;
WatcherBackend backend_;
DISALLOW_COPY_AND_ASSIGN(WatcherThreadManager);
};
WatcherThreadManager* WatcherThreadManager::GetInstance() {
static base::LazyInstance<WatcherThreadManager> instance =
LAZY_INSTANCE_INITIALIZER;
return &instance.Get();
}
WatcherID WatcherThreadManager::StartWatching(
const Handle& handle,
MojoWaitFlags wait_flags,
base::TimeTicks deadline,
const base::Callback<void(MojoResult)>& callback) {
WatchData data;
data.id = watcher_id_generator_.GetNext();
data.handle = handle;
data.callback = callback;
data.wait_flags = wait_flags;
data.deadline = deadline;
data.message_loop = base::MessageLoopProxy::current();
// We outlive |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StartWatching,
base::Unretained(&backend_),
data));
return data.id;
}
void WatcherThreadManager::StopWatching(WatcherID watcher_id) {
// We outlive |thread_|, so it's safe to use Unretained() here.
thread_.message_loop()->PostTask(
FROM_HERE,
base::Bind(&WatcherBackend::StopWatching,
base::Unretained(&backend_),
watcher_id));
}
WatcherThreadManager::WatcherThreadManager()
: thread_(kWatcherThreadName) {
base::Thread::Options thread_options;
thread_options.message_pump_factory = base::Bind(&CreateMessagePumpMojo);
thread_.StartWithOptions(thread_options);
}
WatcherThreadManager::~WatcherThreadManager() {
thread_.Stop();
}
} // namespace
// HandleWatcher::StartState ---------------------------------------------------
// Contains the information passed to Start().
struct HandleWatcher::StartState {
explicit StartState(HandleWatcher* watcher) : weak_factory(watcher) {
}
~StartState() {
}
// ID assigned by WatcherThreadManager.
WatcherID watcher_id;
// Callback to notify when done.
base::Callback<void(MojoResult)> callback;
// When Start() is invoked a callback is passed to WatcherThreadManager
// using a WeakRef from |weak_refactory_|. The callback invokes
// OnHandleReady() (on the thread Start() is invoked from) which in turn
// notifies |callback_|. Doing this allows us to reset state when the handle
// is ready, and then notify the callback. Doing this also means Stop()
// cancels any pending callbacks that may be inflight.
base::WeakPtrFactory<HandleWatcher> weak_factory;
};
// HandleWatcher ---------------------------------------------------------------
// static
base::TickClock* HandleWatcher::tick_clock_ = NULL;
HandleWatcher::HandleWatcher() {
}
HandleWatcher::~HandleWatcher() {
Stop();
}
void HandleWatcher::Start(const Handle& handle,
MojoWaitFlags wait_flags,
MojoDeadline deadline,
const base::Callback<void(MojoResult)>& callback) {
DCHECK(handle.is_valid());
DCHECK_NE(MOJO_WAIT_FLAG_NONE, wait_flags);
Stop();
start_state_.reset(new StartState(this));
start_state_->callback = callback;
start_state_->watcher_id =
WatcherThreadManager::GetInstance()->StartWatching(
handle,
wait_flags,
MojoDeadlineToTimeTicks(deadline),
base::Bind(&HandleWatcher::OnHandleReady,
start_state_->weak_factory.GetWeakPtr()));
}
void HandleWatcher::Stop() {
if (!start_state_.get())
return;
scoped_ptr<StartState> old_state(start_state_.Pass());
WatcherThreadManager::GetInstance()->StopWatching(old_state->watcher_id);
}
void HandleWatcher::OnHandleReady(MojoResult result) {
DCHECK(start_state_.get());
scoped_ptr<StartState> old_state(start_state_.Pass());
old_state->callback.Run(result);
// NOTE: We may have been deleted during callback execution.
}
// static
base::TimeTicks HandleWatcher::NowTicks() {
return tick_clock_ ? tick_clock_->NowTicks() : base::TimeTicks::Now();
}
// static
base::TimeTicks HandleWatcher::MojoDeadlineToTimeTicks(MojoDeadline deadline) {
return deadline == MOJO_DEADLINE_INDEFINITE ? base::TimeTicks() :
NowTicks() + base::TimeDelta::FromMicroseconds(deadline);
}
} // namespace common
} // namespace mojo