// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/bindings/connector.h"
#include <stdint.h>
#include "base/bind.h"
#include "base/lazy_instance.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/macros.h"
#include "base/memory/ptr_util.h"
#include "base/message_loop/message_loop_current.h"
#include "base/run_loop.h"
#include "base/synchronization/lock.h"
#include "base/threading/thread_local.h"
#include "base/trace_event/trace_event.h"
#include "mojo/public/cpp/bindings/lib/may_auto_lock.h"
#include "mojo/public/cpp/bindings/mojo_buildflags.h"
#include "mojo/public/cpp/bindings/sync_handle_watcher.h"
#include "mojo/public/cpp/system/wait.h"
#if defined(ENABLE_IPC_FUZZER)
#include "mojo/public/cpp/bindings/message_dumper.h"
#endif
namespace mojo {
namespace {
// The NestingObserver for each thread. Note that this is always a
// Connector::RunLoopNestingObserver; we use the base type here because that
// subclass is private to Connector.
base::LazyInstance<base::ThreadLocalPointer<base::RunLoop::NestingObserver>>::
Leaky g_tls_nesting_observer = LAZY_INSTANCE_INITIALIZER;
// The default outgoing serialization mode for new Connectors.
Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode =
Connector::OutgoingSerializationMode::kLazy;
// The default incoming serialization mode for new Connectors.
Connector::IncomingSerializationMode g_default_incoming_serialization_mode =
Connector::IncomingSerializationMode::kDispatchAsIs;
} // namespace
// Used to efficiently maintain a doubly-linked list of all Connectors
// currently dispatching on any given thread.
class Connector::ActiveDispatchTracker {
public:
explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector);
~ActiveDispatchTracker();
void NotifyBeginNesting();
private:
const base::WeakPtr<Connector> connector_;
RunLoopNestingObserver* const nesting_observer_;
ActiveDispatchTracker* outer_tracker_ = nullptr;
ActiveDispatchTracker* inner_tracker_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(ActiveDispatchTracker);
};
// Watches the MessageLoop on the current thread. Notifies the current chain of
// ActiveDispatchTrackers when a nested run loop is started.
class Connector::RunLoopNestingObserver
: public base::RunLoop::NestingObserver,
public base::MessageLoopCurrent::DestructionObserver {
public:
RunLoopNestingObserver() {
base::RunLoop::AddNestingObserverOnCurrentThread(this);
base::MessageLoopCurrent::Get()->AddDestructionObserver(this);
}
~RunLoopNestingObserver() override {}
// base::RunLoop::NestingObserver:
void OnBeginNestedRunLoop() override {
if (top_tracker_)
top_tracker_->NotifyBeginNesting();
}
// base::MessageLoopCurrent::DestructionObserver:
void WillDestroyCurrentMessageLoop() override {
base::RunLoop::RemoveNestingObserverOnCurrentThread(this);
base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this);
DCHECK_EQ(this, g_tls_nesting_observer.Get().Get());
g_tls_nesting_observer.Get().Set(nullptr);
delete this;
}
static RunLoopNestingObserver* GetForThread() {
if (!base::MessageLoopCurrent::Get())
return nullptr;
auto* observer = static_cast<RunLoopNestingObserver*>(
g_tls_nesting_observer.Get().Get());
if (!observer) {
observer = new RunLoopNestingObserver;
g_tls_nesting_observer.Get().Set(observer);
}
return observer;
}
private:
friend class ActiveDispatchTracker;
ActiveDispatchTracker* top_tracker_ = nullptr;
DISALLOW_COPY_AND_ASSIGN(RunLoopNestingObserver);
};
Connector::ActiveDispatchTracker::ActiveDispatchTracker(
const base::WeakPtr<Connector>& connector)
: connector_(connector), nesting_observer_(connector_->nesting_observer_) {
DCHECK(nesting_observer_);
if (nesting_observer_->top_tracker_) {
outer_tracker_ = nesting_observer_->top_tracker_;
outer_tracker_->inner_tracker_ = this;
}
nesting_observer_->top_tracker_ = this;
}
Connector::ActiveDispatchTracker::~ActiveDispatchTracker() {
if (nesting_observer_->top_tracker_ == this)
nesting_observer_->top_tracker_ = outer_tracker_;
else if (inner_tracker_)
inner_tracker_->outer_tracker_ = outer_tracker_;
if (outer_tracker_)
outer_tracker_->inner_tracker_ = inner_tracker_;
}
void Connector::ActiveDispatchTracker::NotifyBeginNesting() {
if (connector_ && connector_->handle_watcher_)
connector_->handle_watcher_->ArmOrNotify();
if (outer_tracker_)
outer_tracker_->NotifyBeginNesting();
}
Connector::Connector(ScopedMessagePipeHandle message_pipe,
ConnectorConfig config,
scoped_refptr<base::SequencedTaskRunner> runner)
: message_pipe_(std::move(message_pipe)),
task_runner_(std::move(runner)),
error_(false),
outgoing_serialization_mode_(g_default_outgoing_serialization_mode),
incoming_serialization_mode_(g_default_incoming_serialization_mode),
nesting_observer_(RunLoopNestingObserver::GetForThread()),
weak_factory_(this) {
if (config == MULTI_THREADED_SEND)
lock_.emplace();
#if defined(ENABLE_IPC_FUZZER)
if (!MessageDumper::GetMessageDumpDirectory().empty())
message_dumper_ = std::make_unique<MessageDumper>();
#endif
weak_self_ = weak_factory_.GetWeakPtr();
// Even though we don't have an incoming receiver, we still want to monitor
// the message pipe to know if is closed or encounters an error.
WaitToReadMore();
}
Connector::~Connector() {
{
// Allow for quick destruction on any sequence if the pipe is already
// closed.
base::AutoLock lock(connected_lock_);
if (!connected_)
return;
}
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CancelWait();
}
void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
outgoing_serialization_mode_ = mode;
}
void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
incoming_serialization_mode_ = mode;
}
void Connector::CloseMessagePipe() {
// Throw away the returned message pipe.
PassMessagePipe();
}
ScopedMessagePipeHandle Connector::PassMessagePipe() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
CancelWait();
internal::MayAutoLock locker(&lock_);
ScopedMessagePipeHandle message_pipe = std::move(message_pipe_);
weak_factory_.InvalidateWeakPtrs();
sync_handle_watcher_callback_count_ = 0;
base::AutoLock lock(connected_lock_);
connected_ = false;
return message_pipe;
}
void Connector::RaiseError() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
HandleError(true, true);
}
bool Connector::WaitForIncomingMessage(MojoDeadline deadline) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error_)
return false;
ResumeIncomingMethodCallProcessing();
// TODO(rockot): Use a timed Wait here. Nobody uses anything but 0 or
// INDEFINITE deadlines at present, so we only support those.
DCHECK(deadline == 0 || deadline == MOJO_DEADLINE_INDEFINITE);
MojoResult rv = MOJO_RESULT_UNKNOWN;
if (deadline == 0 && !message_pipe_->QuerySignalsState().readable())
return false;
if (deadline == MOJO_DEADLINE_INDEFINITE) {
rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE);
if (rv != MOJO_RESULT_OK) {
// Users that call WaitForIncomingMessage() should expect their code to be
// re-entered, so we call the error handler synchronously.
HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
return false;
}
}
ignore_result(ReadSingleMessage(&rv));
return (rv == MOJO_RESULT_OK);
}
void Connector::PauseIncomingMethodCallProcessing() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (paused_)
return;
paused_ = true;
CancelWait();
}
void Connector::ResumeIncomingMethodCallProcessing() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (!paused_)
return;
paused_ = false;
WaitToReadMore();
}
bool Connector::PrefersSerializedMessages() {
if (outgoing_serialization_mode_ == OutgoingSerializationMode::kEager)
return true;
DCHECK_EQ(OutgoingSerializationMode::kLazy, outgoing_serialization_mode_);
return peer_remoteness_tracker_ &&
peer_remoteness_tracker_->last_known_state().peer_remote();
}
bool Connector::Accept(Message* message) {
if (!lock_)
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error_)
return false;
internal::MayAutoLock locker(&lock_);
if (!message_pipe_.is_valid() || drop_writes_)
return true;
#if defined(ENABLE_IPC_FUZZER)
if (message_dumper_ && message->is_serialized()) {
bool dump_result = message_dumper_->Accept(message);
DCHECK(dump_result);
}
#endif
MojoResult rv =
WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(),
MOJO_WRITE_MESSAGE_FLAG_NONE);
switch (rv) {
case MOJO_RESULT_OK:
break;
case MOJO_RESULT_FAILED_PRECONDITION:
// There's no point in continuing to write to this pipe since the other
// end is gone. Avoid writing any future messages. Hide write failures
// from the caller since we'd like them to continue consuming any backlog
// of incoming messages before regarding the message pipe as closed.
drop_writes_ = true;
break;
case MOJO_RESULT_BUSY:
// We'd get a "busy" result if one of the message's handles is:
// - |message_pipe_|'s own handle;
// - simultaneously being used on another sequence; or
// - in a "busy" state that prohibits it from being transferred (e.g.,
// a data pipe handle in the middle of a two-phase read/write,
// regardless of which sequence that two-phase read/write is happening
// on).
// TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until
// crbug.com/389666, etc. are resolved, this will make tests fail quickly
// rather than hanging.)
CHECK(false) << "Race condition or other bug detected";
return false;
default:
// This particular write was rejected, presumably because of bad input.
// The pipe is not necessarily in a bad state.
return false;
}
return true;
}
void Connector::AllowWokenUpBySyncWatchOnSameThread() {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
allow_woken_up_by_others_ = true;
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
bool Connector::SyncWatch(const bool* should_stop) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (error_)
return false;
ResumeIncomingMethodCallProcessing();
EnsureSyncWatcherExists();
return sync_watcher_->SyncWatch(should_stop);
}
void Connector::SetWatcherHeapProfilerTag(const char* tag) {
if (tag) {
heap_profiler_tag_ = tag;
if (handle_watcher_)
handle_watcher_->set_heap_profiler_tag(tag);
}
}
// static
void Connector::OverrideDefaultSerializationBehaviorForTesting(
OutgoingSerializationMode outgoing_mode,
IncomingSerializationMode incoming_mode) {
g_default_outgoing_serialization_mode = outgoing_mode;
g_default_incoming_serialization_mode = incoming_mode;
}
void Connector::OnWatcherHandleReady(MojoResult result) {
OnHandleReadyInternal(result);
}
void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) {
base::WeakPtr<Connector> weak_self(weak_self_);
sync_handle_watcher_callback_count_++;
OnHandleReadyInternal(result);
// At this point, this object might have been deleted.
if (weak_self) {
DCHECK_LT(0u, sync_handle_watcher_callback_count_);
sync_handle_watcher_callback_count_--;
}
}
void Connector::OnHandleReadyInternal(MojoResult result) {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
if (result != MOJO_RESULT_OK) {
HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false);
return;
}
ReadAllAvailableMessages();
// At this point, this object might have been deleted. Return.
}
void Connector::WaitToReadMore() {
CHECK(!paused_);
DCHECK(!handle_watcher_);
handle_watcher_.reset(new SimpleWatcher(
FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_));
handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_);
MojoResult rv = handle_watcher_->Watch(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this)));
if (message_pipe_.is_valid()) {
peer_remoteness_tracker_.emplace(message_pipe_.get(),
MOJO_HANDLE_SIGNAL_PEER_REMOTE);
}
if (rv != MOJO_RESULT_OK) {
// If the watch failed because the handle is invalid or its conditions can
// no longer be met, we signal the error asynchronously to avoid reentry.
task_runner_->PostTask(
FROM_HERE,
base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv));
} else {
handle_watcher_->ArmOrNotify();
}
if (allow_woken_up_by_others_) {
EnsureSyncWatcherExists();
sync_watcher_->AllowWokenUpBySyncWatchOnSameThread();
}
}
bool Connector::ReadSingleMessage(MojoResult* read_result) {
CHECK(!paused_);
bool receiver_result = false;
// Detect if |this| was destroyed or the message pipe was closed/transferred
// during message dispatch.
base::WeakPtr<Connector> weak_self = weak_self_;
Message message;
const MojoResult rv = ReadMessage(message_pipe_.get(), &message);
*read_result = rv;
if (rv == MOJO_RESULT_OK) {
base::Optional<ActiveDispatchTracker> dispatch_tracker;
if (!is_dispatching_ && nesting_observer_) {
is_dispatching_ = true;
dispatch_tracker.emplace(weak_self);
}
if (incoming_serialization_mode_ ==
IncomingSerializationMode::kSerializeBeforeDispatchForTesting) {
message.SerializeIfNecessary();
} else {
DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs,
incoming_serialization_mode_);
}
#if !BUILDFLAG(MOJO_TRACE_ENABLED)
// This emits just full class name, and is inferior to mojo tracing.
TRACE_EVENT0("mojom", heap_profiler_tag_);
#endif
receiver_result =
incoming_receiver_ && incoming_receiver_->Accept(&message);
if (!weak_self)
return false;
if (dispatch_tracker) {
is_dispatching_ = false;
dispatch_tracker.reset();
}
} else if (rv == MOJO_RESULT_SHOULD_WAIT) {
return true;
} else {
HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false);
return false;
}
if (enforce_errors_from_incoming_receiver_ && !receiver_result) {
HandleError(true, false);
return false;
}
return true;
}
void Connector::ReadAllAvailableMessages() {
while (!error_) {
base::WeakPtr<Connector> weak_self = weak_self_;
MojoResult rv;
// May delete |this.|
if (!ReadSingleMessage(&rv))
return;
if (!weak_self || paused_)
return;
DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT);
if (rv == MOJO_RESULT_SHOULD_WAIT) {
// Attempt to re-arm the Watcher.
MojoResult ready_result;
MojoResult arm_result = handle_watcher_->Arm(&ready_result);
if (arm_result == MOJO_RESULT_OK)
return;
// The watcher is already ready to notify again.
DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result);
if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) {
HandleError(false, false);
return;
}
// There's more to read now, so we'll just keep looping.
DCHECK_EQ(MOJO_RESULT_OK, ready_result);
}
}
}
void Connector::CancelWait() {
peer_remoteness_tracker_.reset();
handle_watcher_.reset();
sync_watcher_.reset();
}
void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) {
if (error_ || !message_pipe_.is_valid())
return;
if (paused_) {
// Enforce calling the error handler asynchronously if the user has paused
// receiving messages. We need to wait until the user starts receiving
// messages again.
force_async_handler = true;
}
if (!force_pipe_reset && force_async_handler)
force_pipe_reset = true;
if (force_pipe_reset) {
CancelWait();
internal::MayAutoLock locker(&lock_);
message_pipe_.reset();
MessagePipe dummy_pipe;
message_pipe_ = std::move(dummy_pipe.handle0);
} else {
CancelWait();
}
if (force_async_handler) {
if (!paused_)
WaitToReadMore();
} else {
error_ = true;
if (connection_error_handler_)
std::move(connection_error_handler_).Run();
}
}
void Connector::EnsureSyncWatcherExists() {
if (sync_watcher_)
return;
sync_watcher_.reset(new SyncHandleWatcher(
message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE,
base::Bind(&Connector::OnSyncHandleWatcherHandleReady,
base::Unretained(this))));
}
} // namespace mojo