// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/public/cpp/bindings/connector.h" #include <stdint.h> #include "base/bind.h" #include "base/lazy_instance.h" #include "base/location.h" #include "base/logging.h" #include "base/macros.h" #include "base/memory/ptr_util.h" #include "base/message_loop/message_loop_current.h" #include "base/run_loop.h" #include "base/synchronization/lock.h" #include "base/threading/thread_local.h" #include "base/trace_event/trace_event.h" #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" #include "mojo/public/cpp/bindings/mojo_buildflags.h" #include "mojo/public/cpp/bindings/sync_handle_watcher.h" #include "mojo/public/cpp/system/wait.h" #if defined(ENABLE_IPC_FUZZER) #include "mojo/public/cpp/bindings/message_dumper.h" #endif namespace mojo { namespace { // The NestingObserver for each thread. Note that this is always a // Connector::RunLoopNestingObserver; we use the base type here because that // subclass is private to Connector. base::LazyInstance<base::ThreadLocalPointer<base::RunLoop::NestingObserver>>:: Leaky g_tls_nesting_observer = LAZY_INSTANCE_INITIALIZER; // The default outgoing serialization mode for new Connectors. Connector::OutgoingSerializationMode g_default_outgoing_serialization_mode = Connector::OutgoingSerializationMode::kLazy; // The default incoming serialization mode for new Connectors. Connector::IncomingSerializationMode g_default_incoming_serialization_mode = Connector::IncomingSerializationMode::kDispatchAsIs; } // namespace // Used to efficiently maintain a doubly-linked list of all Connectors // currently dispatching on any given thread. class Connector::ActiveDispatchTracker { public: explicit ActiveDispatchTracker(const base::WeakPtr<Connector>& connector); ~ActiveDispatchTracker(); void NotifyBeginNesting(); private: const base::WeakPtr<Connector> connector_; RunLoopNestingObserver* const nesting_observer_; ActiveDispatchTracker* outer_tracker_ = nullptr; ActiveDispatchTracker* inner_tracker_ = nullptr; DISALLOW_COPY_AND_ASSIGN(ActiveDispatchTracker); }; // Watches the MessageLoop on the current thread. Notifies the current chain of // ActiveDispatchTrackers when a nested run loop is started. class Connector::RunLoopNestingObserver : public base::RunLoop::NestingObserver, public base::MessageLoopCurrent::DestructionObserver { public: RunLoopNestingObserver() { base::RunLoop::AddNestingObserverOnCurrentThread(this); base::MessageLoopCurrent::Get()->AddDestructionObserver(this); } ~RunLoopNestingObserver() override {} // base::RunLoop::NestingObserver: void OnBeginNestedRunLoop() override { if (top_tracker_) top_tracker_->NotifyBeginNesting(); } // base::MessageLoopCurrent::DestructionObserver: void WillDestroyCurrentMessageLoop() override { base::RunLoop::RemoveNestingObserverOnCurrentThread(this); base::MessageLoopCurrent::Get()->RemoveDestructionObserver(this); DCHECK_EQ(this, g_tls_nesting_observer.Get().Get()); g_tls_nesting_observer.Get().Set(nullptr); delete this; } static RunLoopNestingObserver* GetForThread() { if (!base::MessageLoopCurrent::Get()) return nullptr; auto* observer = static_cast<RunLoopNestingObserver*>( g_tls_nesting_observer.Get().Get()); if (!observer) { observer = new RunLoopNestingObserver; g_tls_nesting_observer.Get().Set(observer); } return observer; } private: friend class ActiveDispatchTracker; ActiveDispatchTracker* top_tracker_ = nullptr; DISALLOW_COPY_AND_ASSIGN(RunLoopNestingObserver); }; Connector::ActiveDispatchTracker::ActiveDispatchTracker( const base::WeakPtr<Connector>& connector) : connector_(connector), nesting_observer_(connector_->nesting_observer_) { DCHECK(nesting_observer_); if (nesting_observer_->top_tracker_) { outer_tracker_ = nesting_observer_->top_tracker_; outer_tracker_->inner_tracker_ = this; } nesting_observer_->top_tracker_ = this; } Connector::ActiveDispatchTracker::~ActiveDispatchTracker() { if (nesting_observer_->top_tracker_ == this) nesting_observer_->top_tracker_ = outer_tracker_; else if (inner_tracker_) inner_tracker_->outer_tracker_ = outer_tracker_; if (outer_tracker_) outer_tracker_->inner_tracker_ = inner_tracker_; } void Connector::ActiveDispatchTracker::NotifyBeginNesting() { if (connector_ && connector_->handle_watcher_) connector_->handle_watcher_->ArmOrNotify(); if (outer_tracker_) outer_tracker_->NotifyBeginNesting(); } Connector::Connector(ScopedMessagePipeHandle message_pipe, ConnectorConfig config, scoped_refptr<base::SequencedTaskRunner> runner) : message_pipe_(std::move(message_pipe)), task_runner_(std::move(runner)), error_(false), outgoing_serialization_mode_(g_default_outgoing_serialization_mode), incoming_serialization_mode_(g_default_incoming_serialization_mode), nesting_observer_(RunLoopNestingObserver::GetForThread()), weak_factory_(this) { if (config == MULTI_THREADED_SEND) lock_.emplace(); #if defined(ENABLE_IPC_FUZZER) if (!MessageDumper::GetMessageDumpDirectory().empty()) message_dumper_ = std::make_unique<MessageDumper>(); #endif weak_self_ = weak_factory_.GetWeakPtr(); // Even though we don't have an incoming receiver, we still want to monitor // the message pipe to know if is closed or encounters an error. WaitToReadMore(); } Connector::~Connector() { { // Allow for quick destruction on any sequence if the pipe is already // closed. base::AutoLock lock(connected_lock_); if (!connected_) return; } DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); CancelWait(); } void Connector::SetOutgoingSerializationMode(OutgoingSerializationMode mode) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); outgoing_serialization_mode_ = mode; } void Connector::SetIncomingSerializationMode(IncomingSerializationMode mode) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); incoming_serialization_mode_ = mode; } void Connector::CloseMessagePipe() { // Throw away the returned message pipe. PassMessagePipe(); } ScopedMessagePipeHandle Connector::PassMessagePipe() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); CancelWait(); internal::MayAutoLock locker(&lock_); ScopedMessagePipeHandle message_pipe = std::move(message_pipe_); weak_factory_.InvalidateWeakPtrs(); sync_handle_watcher_callback_count_ = 0; base::AutoLock lock(connected_lock_); connected_ = false; return message_pipe; } void Connector::RaiseError() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); HandleError(true, true); } bool Connector::WaitForIncomingMessage(MojoDeadline deadline) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); if (error_) return false; ResumeIncomingMethodCallProcessing(); // TODO(rockot): Use a timed Wait here. Nobody uses anything but 0 or // INDEFINITE deadlines at present, so we only support those. DCHECK(deadline == 0 || deadline == MOJO_DEADLINE_INDEFINITE); MojoResult rv = MOJO_RESULT_UNKNOWN; if (deadline == 0 && !message_pipe_->QuerySignalsState().readable()) return false; if (deadline == MOJO_DEADLINE_INDEFINITE) { rv = Wait(message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE); if (rv != MOJO_RESULT_OK) { // Users that call WaitForIncomingMessage() should expect their code to be // re-entered, so we call the error handler synchronously. HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); return false; } } ignore_result(ReadSingleMessage(&rv)); return (rv == MOJO_RESULT_OK); } void Connector::PauseIncomingMethodCallProcessing() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); if (paused_) return; paused_ = true; CancelWait(); } void Connector::ResumeIncomingMethodCallProcessing() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); if (!paused_) return; paused_ = false; WaitToReadMore(); } bool Connector::PrefersSerializedMessages() { if (outgoing_serialization_mode_ == OutgoingSerializationMode::kEager) return true; DCHECK_EQ(OutgoingSerializationMode::kLazy, outgoing_serialization_mode_); return peer_remoteness_tracker_ && peer_remoteness_tracker_->last_known_state().peer_remote(); } bool Connector::Accept(Message* message) { if (!lock_) DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); if (error_) return false; internal::MayAutoLock locker(&lock_); if (!message_pipe_.is_valid() || drop_writes_) return true; #if defined(ENABLE_IPC_FUZZER) if (message_dumper_ && message->is_serialized()) { bool dump_result = message_dumper_->Accept(message); DCHECK(dump_result); } #endif MojoResult rv = WriteMessageNew(message_pipe_.get(), message->TakeMojoMessage(), MOJO_WRITE_MESSAGE_FLAG_NONE); switch (rv) { case MOJO_RESULT_OK: break; case MOJO_RESULT_FAILED_PRECONDITION: // There's no point in continuing to write to this pipe since the other // end is gone. Avoid writing any future messages. Hide write failures // from the caller since we'd like them to continue consuming any backlog // of incoming messages before regarding the message pipe as closed. drop_writes_ = true; break; case MOJO_RESULT_BUSY: // We'd get a "busy" result if one of the message's handles is: // - |message_pipe_|'s own handle; // - simultaneously being used on another sequence; or // - in a "busy" state that prohibits it from being transferred (e.g., // a data pipe handle in the middle of a two-phase read/write, // regardless of which sequence that two-phase read/write is happening // on). // TODO(vtl): I wonder if this should be a |DCHECK()|. (But, until // crbug.com/389666, etc. are resolved, this will make tests fail quickly // rather than hanging.) CHECK(false) << "Race condition or other bug detected"; return false; default: // This particular write was rejected, presumably because of bad input. // The pipe is not necessarily in a bad state. return false; } return true; } void Connector::AllowWokenUpBySyncWatchOnSameThread() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); allow_woken_up_by_others_ = true; EnsureSyncWatcherExists(); sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); } bool Connector::SyncWatch(const bool* should_stop) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); if (error_) return false; ResumeIncomingMethodCallProcessing(); EnsureSyncWatcherExists(); return sync_watcher_->SyncWatch(should_stop); } void Connector::SetWatcherHeapProfilerTag(const char* tag) { if (tag) { heap_profiler_tag_ = tag; if (handle_watcher_) handle_watcher_->set_heap_profiler_tag(tag); } } // static void Connector::OverrideDefaultSerializationBehaviorForTesting( OutgoingSerializationMode outgoing_mode, IncomingSerializationMode incoming_mode) { g_default_outgoing_serialization_mode = outgoing_mode; g_default_incoming_serialization_mode = incoming_mode; } void Connector::OnWatcherHandleReady(MojoResult result) { OnHandleReadyInternal(result); } void Connector::OnSyncHandleWatcherHandleReady(MojoResult result) { base::WeakPtr<Connector> weak_self(weak_self_); sync_handle_watcher_callback_count_++; OnHandleReadyInternal(result); // At this point, this object might have been deleted. if (weak_self) { DCHECK_LT(0u, sync_handle_watcher_callback_count_); sync_handle_watcher_callback_count_--; } } void Connector::OnHandleReadyInternal(MojoResult result) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); if (result != MOJO_RESULT_OK) { HandleError(result != MOJO_RESULT_FAILED_PRECONDITION, false); return; } ReadAllAvailableMessages(); // At this point, this object might have been deleted. Return. } void Connector::WaitToReadMore() { CHECK(!paused_); DCHECK(!handle_watcher_); handle_watcher_.reset(new SimpleWatcher( FROM_HERE, SimpleWatcher::ArmingPolicy::MANUAL, task_runner_)); handle_watcher_->set_heap_profiler_tag(heap_profiler_tag_); MojoResult rv = handle_watcher_->Watch( message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, base::Bind(&Connector::OnWatcherHandleReady, base::Unretained(this))); if (message_pipe_.is_valid()) { peer_remoteness_tracker_.emplace(message_pipe_.get(), MOJO_HANDLE_SIGNAL_PEER_REMOTE); } if (rv != MOJO_RESULT_OK) { // If the watch failed because the handle is invalid or its conditions can // no longer be met, we signal the error asynchronously to avoid reentry. task_runner_->PostTask( FROM_HERE, base::Bind(&Connector::OnWatcherHandleReady, weak_self_, rv)); } else { handle_watcher_->ArmOrNotify(); } if (allow_woken_up_by_others_) { EnsureSyncWatcherExists(); sync_watcher_->AllowWokenUpBySyncWatchOnSameThread(); } } bool Connector::ReadSingleMessage(MojoResult* read_result) { CHECK(!paused_); bool receiver_result = false; // Detect if |this| was destroyed or the message pipe was closed/transferred // during message dispatch. base::WeakPtr<Connector> weak_self = weak_self_; Message message; const MojoResult rv = ReadMessage(message_pipe_.get(), &message); *read_result = rv; if (rv == MOJO_RESULT_OK) { base::Optional<ActiveDispatchTracker> dispatch_tracker; if (!is_dispatching_ && nesting_observer_) { is_dispatching_ = true; dispatch_tracker.emplace(weak_self); } if (incoming_serialization_mode_ == IncomingSerializationMode::kSerializeBeforeDispatchForTesting) { message.SerializeIfNecessary(); } else { DCHECK_EQ(IncomingSerializationMode::kDispatchAsIs, incoming_serialization_mode_); } #if !BUILDFLAG(MOJO_TRACE_ENABLED) // This emits just full class name, and is inferior to mojo tracing. TRACE_EVENT0("mojom", heap_profiler_tag_); #endif receiver_result = incoming_receiver_ && incoming_receiver_->Accept(&message); if (!weak_self) return false; if (dispatch_tracker) { is_dispatching_ = false; dispatch_tracker.reset(); } } else if (rv == MOJO_RESULT_SHOULD_WAIT) { return true; } else { HandleError(rv != MOJO_RESULT_FAILED_PRECONDITION, false); return false; } if (enforce_errors_from_incoming_receiver_ && !receiver_result) { HandleError(true, false); return false; } return true; } void Connector::ReadAllAvailableMessages() { while (!error_) { base::WeakPtr<Connector> weak_self = weak_self_; MojoResult rv; // May delete |this.| if (!ReadSingleMessage(&rv)) return; if (!weak_self || paused_) return; DCHECK(rv == MOJO_RESULT_OK || rv == MOJO_RESULT_SHOULD_WAIT); if (rv == MOJO_RESULT_SHOULD_WAIT) { // Attempt to re-arm the Watcher. MojoResult ready_result; MojoResult arm_result = handle_watcher_->Arm(&ready_result); if (arm_result == MOJO_RESULT_OK) return; // The watcher is already ready to notify again. DCHECK_EQ(MOJO_RESULT_FAILED_PRECONDITION, arm_result); if (ready_result == MOJO_RESULT_FAILED_PRECONDITION) { HandleError(false, false); return; } // There's more to read now, so we'll just keep looping. DCHECK_EQ(MOJO_RESULT_OK, ready_result); } } } void Connector::CancelWait() { peer_remoteness_tracker_.reset(); handle_watcher_.reset(); sync_watcher_.reset(); } void Connector::HandleError(bool force_pipe_reset, bool force_async_handler) { if (error_ || !message_pipe_.is_valid()) return; if (paused_) { // Enforce calling the error handler asynchronously if the user has paused // receiving messages. We need to wait until the user starts receiving // messages again. force_async_handler = true; } if (!force_pipe_reset && force_async_handler) force_pipe_reset = true; if (force_pipe_reset) { CancelWait(); internal::MayAutoLock locker(&lock_); message_pipe_.reset(); MessagePipe dummy_pipe; message_pipe_ = std::move(dummy_pipe.handle0); } else { CancelWait(); } if (force_async_handler) { if (!paused_) WaitToReadMore(); } else { error_ = true; if (connection_error_handler_) std::move(connection_error_handler_).Run(); } } void Connector::EnsureSyncWatcherExists() { if (sync_watcher_) return; sync_watcher_.reset(new SyncHandleWatcher( message_pipe_.get(), MOJO_HANDLE_SIGNAL_READABLE, base::Bind(&Connector::OnSyncHandleWatcherHandleReady, base::Unretained(this)))); } } // namespace mojo