// Copyright 2015 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/public/cpp/bindings/lib/multiplex_router.h" #include <stdint.h> #include <utility> #include "base/bind.h" #include "base/location.h" #include "base/macros.h" #include "base/memory/ptr_util.h" #include "base/sequenced_task_runner.h" #include "base/stl_util.h" #include "base/synchronization/waitable_event.h" #include "mojo/public/cpp/bindings/interface_endpoint_client.h" #include "mojo/public/cpp/bindings/interface_endpoint_controller.h" #include "mojo/public/cpp/bindings/lib/may_auto_lock.h" #include "mojo/public/cpp/bindings/sequence_local_sync_event_watcher.h" namespace mojo { namespace internal { // InterfaceEndpoint stores the information of an interface endpoint registered // with the router. // No one other than the router's |endpoints_| and |tasks_| should hold refs to // this object. class MultiplexRouter::InterfaceEndpoint : public base::RefCountedThreadSafe<InterfaceEndpoint>, public InterfaceEndpointController { public: InterfaceEndpoint(MultiplexRouter* router, InterfaceId id) : router_(router), id_(id), closed_(false), peer_closed_(false), handle_created_(false), client_(nullptr) {} // --------------------------------------------------------------------------- // The following public methods are safe to call from any sequence without // locking. InterfaceId id() const { return id_; } // --------------------------------------------------------------------------- // The following public methods are called under the router's lock. bool closed() const { return closed_; } void set_closed() { router_->AssertLockAcquired(); closed_ = true; } bool peer_closed() const { return peer_closed_; } void set_peer_closed() { router_->AssertLockAcquired(); peer_closed_ = true; } bool handle_created() const { return handle_created_; } void set_handle_created() { router_->AssertLockAcquired(); handle_created_ = true; } const base::Optional<DisconnectReason>& disconnect_reason() const { return disconnect_reason_; } void set_disconnect_reason( const base::Optional<DisconnectReason>& disconnect_reason) { router_->AssertLockAcquired(); disconnect_reason_ = disconnect_reason; } base::SequencedTaskRunner* task_runner() const { return task_runner_.get(); } InterfaceEndpointClient* client() const { return client_; } void AttachClient(InterfaceEndpointClient* client, scoped_refptr<base::SequencedTaskRunner> runner) { router_->AssertLockAcquired(); DCHECK(!client_); DCHECK(!closed_); DCHECK(runner->RunsTasksInCurrentSequence()); task_runner_ = std::move(runner); client_ = client; } // This method must be called on the same sequence as the corresponding // AttachClient() call. void DetachClient() { router_->AssertLockAcquired(); DCHECK(client_); DCHECK(task_runner_->RunsTasksInCurrentSequence()); DCHECK(!closed_); task_runner_ = nullptr; client_ = nullptr; sync_watcher_.reset(); } void SignalSyncMessageEvent() { router_->AssertLockAcquired(); if (sync_message_event_signaled_) return; sync_message_event_signaled_ = true; if (sync_watcher_) sync_watcher_->SignalEvent(); } void ResetSyncMessageSignal() { router_->AssertLockAcquired(); if (!sync_message_event_signaled_) return; sync_message_event_signaled_ = false; if (sync_watcher_) sync_watcher_->ResetEvent(); } // --------------------------------------------------------------------------- // The following public methods (i.e., InterfaceEndpointController // implementation) are called by the client on the same sequence as the // AttachClient() call. They are called outside of the router's lock. bool SendMessage(Message* message) override { DCHECK(task_runner_->RunsTasksInCurrentSequence()); message->set_interface_id(id_); return router_->connector_.Accept(message); } void AllowWokenUpBySyncWatchOnSameThread() override { DCHECK(task_runner_->RunsTasksInCurrentSequence()); EnsureSyncWatcherExists(); sync_watcher_->AllowWokenUpBySyncWatchOnSameSequence(); } bool SyncWatch(const bool* should_stop) override { DCHECK(task_runner_->RunsTasksInCurrentSequence()); EnsureSyncWatcherExists(); return sync_watcher_->SyncWatch(should_stop); } private: friend class base::RefCountedThreadSafe<InterfaceEndpoint>; ~InterfaceEndpoint() override { router_->AssertLockAcquired(); DCHECK(!client_); } void OnSyncEventSignaled() { DCHECK(task_runner_->RunsTasksInCurrentSequence()); scoped_refptr<MultiplexRouter> router_protector(router_); MayAutoLock locker(&router_->lock_); scoped_refptr<InterfaceEndpoint> self_protector(this); bool more_to_process = router_->ProcessFirstSyncMessageForEndpoint(id_); if (!more_to_process) ResetSyncMessageSignal(); // Currently there are no queued sync messages and the peer has closed so // there won't be incoming sync messages in the future. if (!more_to_process && peer_closed_) { // If a SyncWatch() call (or multiple ones) of this interface endpoint is // on the call stack, resetting the sync watcher will allow it to exit // when the call stack unwinds to that frame. sync_watcher_.reset(); } } void EnsureSyncWatcherExists() { DCHECK(task_runner_->RunsTasksInCurrentSequence()); if (sync_watcher_) return; MayAutoLock locker(&router_->lock_); sync_watcher_ = std::make_unique<SequenceLocalSyncEventWatcher>(base::BindRepeating( &InterfaceEndpoint::OnSyncEventSignaled, base::Unretained(this))); if (sync_message_event_signaled_) sync_watcher_->SignalEvent(); } // --------------------------------------------------------------------------- // The following members are safe to access from any sequence. MultiplexRouter* const router_; const InterfaceId id_; // --------------------------------------------------------------------------- // The following members are accessed under the router's lock. // Whether the endpoint has been closed. bool closed_; // Whether the peer endpoint has been closed. bool peer_closed_; // Whether there is already a ScopedInterfaceEndpointHandle created for this // endpoint. bool handle_created_; base::Optional<DisconnectReason> disconnect_reason_; // The task runner on which |client_|'s methods can be called. scoped_refptr<base::SequencedTaskRunner> task_runner_; // Not owned. It is null if no client is attached to this endpoint. InterfaceEndpointClient* client_; // Indicates whether the sync watcher should be signaled for this endpoint. bool sync_message_event_signaled_ = false; // Guarded by the router's lock. Used to synchronously wait on replies. std::unique_ptr<SequenceLocalSyncEventWatcher> sync_watcher_; DISALLOW_COPY_AND_ASSIGN(InterfaceEndpoint); }; // MessageWrapper objects are always destroyed under the router's lock. On // destruction, if the message it wrappers contains interface IDs, the wrapper // closes the corresponding endpoints. class MultiplexRouter::MessageWrapper { public: MessageWrapper() = default; MessageWrapper(MultiplexRouter* router, Message message) : router_(router), value_(std::move(message)) {} MessageWrapper(MessageWrapper&& other) : router_(other.router_), value_(std::move(other.value_)) {} ~MessageWrapper() { if (!router_ || value_.IsNull()) return; router_->AssertLockAcquired(); // Don't try to close the endpoints if at this point the router is already // half-destructed. if (!router_->being_destructed_) router_->CloseEndpointsForMessage(value_); } MessageWrapper& operator=(MessageWrapper&& other) { router_ = other.router_; value_ = std::move(other.value_); return *this; } const Message& value() const { return value_; } // Must be called outside of the router's lock. // Returns a null message if it fails to deseralize the associated endpoint // handles. Message DeserializeEndpointHandlesAndTake() { if (!value_.DeserializeAssociatedEndpointHandles(router_)) { // The previous call may have deserialized part of the associated // interface endpoint handles. They must be destroyed outside of the // router's lock, so we cannot wait until destruction of MessageWrapper. value_.Reset(); return Message(); } return std::move(value_); } private: MultiplexRouter* router_ = nullptr; Message value_; DISALLOW_COPY_AND_ASSIGN(MessageWrapper); }; struct MultiplexRouter::Task { public: // Doesn't take ownership of |message| but takes its contents. static std::unique_ptr<Task> CreateMessageTask( MessageWrapper message_wrapper) { Task* task = new Task(MESSAGE); task->message_wrapper = std::move(message_wrapper); return base::WrapUnique(task); } static std::unique_ptr<Task> CreateNotifyErrorTask( InterfaceEndpoint* endpoint) { Task* task = new Task(NOTIFY_ERROR); task->endpoint_to_notify = endpoint; return base::WrapUnique(task); } ~Task() {} bool IsMessageTask() const { return type == MESSAGE; } bool IsNotifyErrorTask() const { return type == NOTIFY_ERROR; } MessageWrapper message_wrapper; scoped_refptr<InterfaceEndpoint> endpoint_to_notify; enum Type { MESSAGE, NOTIFY_ERROR }; Type type; private: explicit Task(Type in_type) : type(in_type) {} DISALLOW_COPY_AND_ASSIGN(Task); }; MultiplexRouter::MultiplexRouter( ScopedMessagePipeHandle message_pipe, Config config, bool set_interface_id_namesapce_bit, scoped_refptr<base::SequencedTaskRunner> runner) : set_interface_id_namespace_bit_(set_interface_id_namesapce_bit), task_runner_(runner), filters_(this), connector_(std::move(message_pipe), config == MULTI_INTERFACE ? Connector::MULTI_THREADED_SEND : Connector::SINGLE_THREADED_SEND, std::move(runner)), control_message_handler_(this), control_message_proxy_(&connector_) { DCHECK(task_runner_->RunsTasksInCurrentSequence()); if (config == MULTI_INTERFACE) lock_.emplace(); if (config == SINGLE_INTERFACE_WITH_SYNC_METHODS || config == MULTI_INTERFACE) { // Always participate in sync handle watching in multi-interface mode, // because even if it doesn't expect sync requests during sync handle // watching, it may still need to dispatch messages to associated endpoints // on a different sequence. connector_.AllowWokenUpBySyncWatchOnSameThread(); } connector_.set_incoming_receiver(&filters_); connector_.set_connection_error_handler(base::Bind( &MultiplexRouter::OnPipeConnectionError, base::Unretained(this))); std::unique_ptr<MessageHeaderValidator> header_validator = std::make_unique<MessageHeaderValidator>(); header_validator_ = header_validator.get(); filters_.Append(std::move(header_validator)); } MultiplexRouter::~MultiplexRouter() { MayAutoLock locker(&lock_); being_destructed_ = true; sync_message_tasks_.clear(); tasks_.clear(); endpoints_.clear(); } void MultiplexRouter::AddIncomingMessageFilter( std::unique_ptr<MessageReceiver> filter) { filters_.Append(std::move(filter)); } void MultiplexRouter::SetMasterInterfaceName(const char* name) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); header_validator_->SetDescription(std::string(name) + " [master] MessageHeaderValidator"); control_message_handler_.SetDescription( std::string(name) + " [master] PipeControlMessageHandler"); connector_.SetWatcherHeapProfilerTag(name); } InterfaceId MultiplexRouter::AssociateInterface( ScopedInterfaceEndpointHandle handle_to_send) { if (!handle_to_send.pending_association()) return kInvalidInterfaceId; uint32_t id = 0; { MayAutoLock locker(&lock_); do { if (next_interface_id_value_ >= kInterfaceIdNamespaceMask) next_interface_id_value_ = 1; id = next_interface_id_value_++; if (set_interface_id_namespace_bit_) id |= kInterfaceIdNamespaceMask; } while (base::ContainsKey(endpoints_, id)); InterfaceEndpoint* endpoint = new InterfaceEndpoint(this, id); endpoints_[id] = endpoint; if (encountered_error_) UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); endpoint->set_handle_created(); } if (!NotifyAssociation(&handle_to_send, id)) { // The peer handle of |handle_to_send|, which is supposed to join this // associated group, has been closed. { MayAutoLock locker(&lock_); InterfaceEndpoint* endpoint = FindEndpoint(id); if (endpoint) UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); } control_message_proxy_.NotifyPeerEndpointClosed( id, handle_to_send.disconnect_reason()); } return id; } ScopedInterfaceEndpointHandle MultiplexRouter::CreateLocalEndpointHandle( InterfaceId id) { if (!IsValidInterfaceId(id)) return ScopedInterfaceEndpointHandle(); MayAutoLock locker(&lock_); bool inserted = false; InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, &inserted); if (inserted) { if (encountered_error_) UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); } else { if (endpoint->handle_created() || endpoint->closed()) return ScopedInterfaceEndpointHandle(); } endpoint->set_handle_created(); return CreateScopedInterfaceEndpointHandle(id); } void MultiplexRouter::CloseEndpointHandle( InterfaceId id, const base::Optional<DisconnectReason>& reason) { if (!IsValidInterfaceId(id)) return; MayAutoLock locker(&lock_); DCHECK(base::ContainsKey(endpoints_, id)); InterfaceEndpoint* endpoint = endpoints_[id].get(); DCHECK(!endpoint->client()); DCHECK(!endpoint->closed()); UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); if (!IsMasterInterfaceId(id) || reason) { MayAutoUnlock unlocker(&lock_); control_message_proxy_.NotifyPeerEndpointClosed(id, reason); } ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); } InterfaceEndpointController* MultiplexRouter::AttachEndpointClient( const ScopedInterfaceEndpointHandle& handle, InterfaceEndpointClient* client, scoped_refptr<base::SequencedTaskRunner> runner) { const InterfaceId id = handle.id(); DCHECK(IsValidInterfaceId(id)); DCHECK(client); MayAutoLock locker(&lock_); DCHECK(base::ContainsKey(endpoints_, id)); InterfaceEndpoint* endpoint = endpoints_[id].get(); endpoint->AttachClient(client, std::move(runner)); if (endpoint->peer_closed()) tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); return endpoint; } void MultiplexRouter::DetachEndpointClient( const ScopedInterfaceEndpointHandle& handle) { const InterfaceId id = handle.id(); DCHECK(IsValidInterfaceId(id)); MayAutoLock locker(&lock_); DCHECK(base::ContainsKey(endpoints_, id)); InterfaceEndpoint* endpoint = endpoints_[id].get(); endpoint->DetachClient(); } void MultiplexRouter::RaiseError() { if (task_runner_->RunsTasksInCurrentSequence()) { connector_.RaiseError(); } else { task_runner_->PostTask(FROM_HERE, base::Bind(&MultiplexRouter::RaiseError, this)); } } bool MultiplexRouter::PrefersSerializedMessages() { MayAutoLock locker(&lock_); return connector_.PrefersSerializedMessages(); } void MultiplexRouter::CloseMessagePipe() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); connector_.CloseMessagePipe(); // CloseMessagePipe() above won't trigger connection error handler. // Explicitly call OnPipeConnectionError() so that associated endpoints will // get notified. OnPipeConnectionError(); } void MultiplexRouter::PauseIncomingMethodCallProcessing() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); connector_.PauseIncomingMethodCallProcessing(); MayAutoLock locker(&lock_); paused_ = true; for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) iter->second->ResetSyncMessageSignal(); } void MultiplexRouter::ResumeIncomingMethodCallProcessing() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); connector_.ResumeIncomingMethodCallProcessing(); MayAutoLock locker(&lock_); paused_ = false; for (auto iter = endpoints_.begin(); iter != endpoints_.end(); ++iter) { auto sync_iter = sync_message_tasks_.find(iter->first); if (iter->second->peer_closed() || (sync_iter != sync_message_tasks_.end() && !sync_iter->second.empty())) { iter->second->SignalSyncMessageEvent(); } } ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); } bool MultiplexRouter::HasAssociatedEndpoints() const { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); MayAutoLock locker(&lock_); if (endpoints_.size() > 1) return true; if (endpoints_.size() == 0) return false; return !base::ContainsKey(endpoints_, kMasterInterfaceId); } void MultiplexRouter::EnableTestingMode() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); MayAutoLock locker(&lock_); testing_mode_ = true; connector_.set_enforce_errors_from_incoming_receiver(false); } bool MultiplexRouter::Accept(Message* message) { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); // Insert endpoints for the payload interface IDs as soon as the message // arrives, instead of waiting till the message is dispatched. Consider the // following sequence: // 1) Async message msg1 arrives, containing interface ID x. Msg1 is not // dispatched because a sync call is blocking the thread. // 2) Sync message msg2 arrives targeting interface ID x. // // If we don't insert endpoint for interface ID x, when trying to dispatch // msg2 we don't know whether it is an unexpected message or it is just // because the message containing x hasn't been dispatched. if (!InsertEndpointsForMessage(*message)) return false; scoped_refptr<MultiplexRouter> protector(this); MayAutoLock locker(&lock_); DCHECK(!paused_); ClientCallBehavior client_call_behavior = connector_.during_sync_handle_watcher_callback() ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES : ALLOW_DIRECT_CLIENT_CALLS; MessageWrapper message_wrapper(this, std::move(*message)); bool processed = tasks_.empty() && ProcessIncomingMessage( &message_wrapper, client_call_behavior, connector_.task_runner()); if (!processed) { // Either the task queue is not empty or we cannot process the message // directly. In both cases, there is no need to call ProcessTasks(). tasks_.push_back(Task::CreateMessageTask(std::move(message_wrapper))); Task* task = tasks_.back().get(); if (task->message_wrapper.value().has_flag(Message::kFlagIsSync)) { InterfaceId id = task->message_wrapper.value().interface_id(); sync_message_tasks_[id].push_back(task); InterfaceEndpoint* endpoint = FindEndpoint(id); if (endpoint) endpoint->SignalSyncMessageEvent(); } } else if (!tasks_.empty()) { // Processing the message may result in new tasks (for error notification) // being added to the queue. In this case, we have to attempt to process the // tasks. ProcessTasks(client_call_behavior, connector_.task_runner()); } // Always return true. If we see errors during message processing, we will // explicitly call Connector::RaiseError() to disconnect the message pipe. return true; } bool MultiplexRouter::OnPeerAssociatedEndpointClosed( InterfaceId id, const base::Optional<DisconnectReason>& reason) { MayAutoLock locker(&lock_); InterfaceEndpoint* endpoint = FindOrInsertEndpoint(id, nullptr); if (reason) endpoint->set_disconnect_reason(reason); // It is possible that this endpoint has been set as peer closed. That is // because when the message pipe is closed, all the endpoints are updated with // PEER_ENDPOINT_CLOSED. We continue to process remaining tasks in the queue, // as long as there are refs keeping the router alive. If there is a // PeerAssociatedEndpointClosedEvent control message in the queue, we will get // here and see that the endpoint has been marked as peer closed. if (!endpoint->peer_closed()) { if (endpoint->client()) tasks_.push_back(Task::CreateNotifyErrorTask(endpoint)); UpdateEndpointStateMayRemove(endpoint, PEER_ENDPOINT_CLOSED); } // No need to trigger a ProcessTasks() because it is already on the stack. return true; } void MultiplexRouter::OnPipeConnectionError() { DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_); scoped_refptr<MultiplexRouter> protector(this); MayAutoLock locker(&lock_); encountered_error_ = true; // Calling UpdateEndpointStateMayRemove() may remove the corresponding value // from |endpoints_| and invalidate any iterator of |endpoints_|. Therefore, // copy the endpoint pointers to a vector and iterate over it instead. std::vector<scoped_refptr<InterfaceEndpoint>> endpoint_vector; endpoint_vector.reserve(endpoints_.size()); for (const auto& pair : endpoints_) endpoint_vector.push_back(pair.second); for (const auto& endpoint : endpoint_vector) { if (endpoint->client()) tasks_.push_back(Task::CreateNotifyErrorTask(endpoint.get())); UpdateEndpointStateMayRemove(endpoint.get(), PEER_ENDPOINT_CLOSED); } ProcessTasks(connector_.during_sync_handle_watcher_callback() ? ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES : ALLOW_DIRECT_CLIENT_CALLS, connector_.task_runner()); } void MultiplexRouter::ProcessTasks( ClientCallBehavior client_call_behavior, base::SequencedTaskRunner* current_task_runner) { AssertLockAcquired(); if (posted_to_process_tasks_) return; while (!tasks_.empty() && !paused_) { std::unique_ptr<Task> task(std::move(tasks_.front())); tasks_.pop_front(); InterfaceId id = kInvalidInterfaceId; bool sync_message = task->IsMessageTask() && !task->message_wrapper.value().IsNull() && task->message_wrapper.value().has_flag(Message::kFlagIsSync); if (sync_message) { id = task->message_wrapper.value().interface_id(); auto& sync_message_queue = sync_message_tasks_[id]; DCHECK_EQ(task.get(), sync_message_queue.front()); sync_message_queue.pop_front(); } bool processed = task->IsNotifyErrorTask() ? ProcessNotifyErrorTask(task.get(), client_call_behavior, current_task_runner) : ProcessIncomingMessage(&task->message_wrapper, client_call_behavior, current_task_runner); if (!processed) { if (sync_message) { auto& sync_message_queue = sync_message_tasks_[id]; sync_message_queue.push_front(task.get()); } tasks_.push_front(std::move(task)); break; } else { if (sync_message) { auto iter = sync_message_tasks_.find(id); if (iter != sync_message_tasks_.end() && iter->second.empty()) sync_message_tasks_.erase(iter); } } } } bool MultiplexRouter::ProcessFirstSyncMessageForEndpoint(InterfaceId id) { AssertLockAcquired(); auto iter = sync_message_tasks_.find(id); if (iter == sync_message_tasks_.end()) return false; if (paused_) return true; MultiplexRouter::Task* task = iter->second.front(); iter->second.pop_front(); DCHECK(task->IsMessageTask()); MessageWrapper message_wrapper = std::move(task->message_wrapper); // Note: after this call, |task| and |iter| may be invalidated. bool processed = ProcessIncomingMessage( &message_wrapper, ALLOW_DIRECT_CLIENT_CALLS_FOR_SYNC_MESSAGES, nullptr); DCHECK(processed); iter = sync_message_tasks_.find(id); if (iter == sync_message_tasks_.end()) return false; if (iter->second.empty()) { sync_message_tasks_.erase(iter); return false; } return true; } bool MultiplexRouter::ProcessNotifyErrorTask( Task* task, ClientCallBehavior client_call_behavior, base::SequencedTaskRunner* current_task_runner) { DCHECK(!current_task_runner || current_task_runner->RunsTasksInCurrentSequence()); DCHECK(!paused_); AssertLockAcquired(); InterfaceEndpoint* endpoint = task->endpoint_to_notify.get(); if (!endpoint->client()) return true; if (client_call_behavior != ALLOW_DIRECT_CLIENT_CALLS || endpoint->task_runner() != current_task_runner) { MaybePostToProcessTasks(endpoint->task_runner()); return false; } DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); InterfaceEndpointClient* client = endpoint->client(); base::Optional<DisconnectReason> disconnect_reason( endpoint->disconnect_reason()); { // We must unlock before calling into |client| because it may call this // object within NotifyError(). Holding the lock will lead to deadlock. // // It is safe to call into |client| without the lock. Because |client| is // always accessed on the same sequence, including DetachEndpointClient(). MayAutoUnlock unlocker(&lock_); client->NotifyError(disconnect_reason); } return true; } bool MultiplexRouter::ProcessIncomingMessage( MessageWrapper* message_wrapper, ClientCallBehavior client_call_behavior, base::SequencedTaskRunner* current_task_runner) { DCHECK(!current_task_runner || current_task_runner->RunsTasksInCurrentSequence()); DCHECK(!paused_); DCHECK(message_wrapper); AssertLockAcquired(); const Message* message = &message_wrapper->value(); if (message->IsNull()) { // This is a sync message and has been processed during sync handle // watching. return true; } if (PipeControlMessageHandler::IsPipeControlMessage(message)) { bool result = false; { MayAutoUnlock unlocker(&lock_); Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake(); result = !tmp_message.IsNull() && control_message_handler_.Accept(&tmp_message); } if (!result) RaiseErrorInNonTestingMode(); return true; } InterfaceId id = message->interface_id(); DCHECK(IsValidInterfaceId(id)); InterfaceEndpoint* endpoint = FindEndpoint(id); if (!endpoint || endpoint->closed()) return true; if (!endpoint->client()) { // We need to wait until a client is attached in order to dispatch further // messages. return false; } bool can_direct_call; if (message->has_flag(Message::kFlagIsSync)) { can_direct_call = client_call_behavior != NO_DIRECT_CLIENT_CALLS && endpoint->task_runner()->RunsTasksInCurrentSequence(); } else { can_direct_call = client_call_behavior == ALLOW_DIRECT_CLIENT_CALLS && endpoint->task_runner() == current_task_runner; } if (!can_direct_call) { MaybePostToProcessTasks(endpoint->task_runner()); return false; } DCHECK(endpoint->task_runner()->RunsTasksInCurrentSequence()); InterfaceEndpointClient* client = endpoint->client(); bool result = false; { // We must unlock before calling into |client| because it may call this // object within HandleIncomingMessage(). Holding the lock will lead to // deadlock. // // It is safe to call into |client| without the lock. Because |client| is // always accessed on the same sequence, including DetachEndpointClient(). MayAutoUnlock unlocker(&lock_); Message tmp_message = message_wrapper->DeserializeEndpointHandlesAndTake(); result = !tmp_message.IsNull() && client->HandleIncomingMessage(&tmp_message); } if (!result) RaiseErrorInNonTestingMode(); return true; } void MultiplexRouter::MaybePostToProcessTasks( base::SequencedTaskRunner* task_runner) { AssertLockAcquired(); if (posted_to_process_tasks_) return; posted_to_process_tasks_ = true; posted_to_task_runner_ = task_runner; task_runner->PostTask( FROM_HERE, base::Bind(&MultiplexRouter::LockAndCallProcessTasks, this)); } void MultiplexRouter::LockAndCallProcessTasks() { // There is no need to hold a ref to this class in this case because this is // always called using base::Bind(), which holds a ref. MayAutoLock locker(&lock_); posted_to_process_tasks_ = false; scoped_refptr<base::SequencedTaskRunner> runner( std::move(posted_to_task_runner_)); ProcessTasks(ALLOW_DIRECT_CLIENT_CALLS, runner.get()); } void MultiplexRouter::UpdateEndpointStateMayRemove( InterfaceEndpoint* endpoint, EndpointStateUpdateType type) { if (type == ENDPOINT_CLOSED) { endpoint->set_closed(); } else { endpoint->set_peer_closed(); // If the interface endpoint is performing a sync watch, this makes sure // it is notified and eventually exits the sync watch. endpoint->SignalSyncMessageEvent(); } if (endpoint->closed() && endpoint->peer_closed()) endpoints_.erase(endpoint->id()); } void MultiplexRouter::RaiseErrorInNonTestingMode() { AssertLockAcquired(); if (!testing_mode_) RaiseError(); } MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindOrInsertEndpoint( InterfaceId id, bool* inserted) { AssertLockAcquired(); // Either |inserted| is nullptr or it points to a boolean initialized as // false. DCHECK(!inserted || !*inserted); InterfaceEndpoint* endpoint = FindEndpoint(id); if (!endpoint) { endpoint = new InterfaceEndpoint(this, id); endpoints_[id] = endpoint; if (inserted) *inserted = true; } return endpoint; } MultiplexRouter::InterfaceEndpoint* MultiplexRouter::FindEndpoint( InterfaceId id) { AssertLockAcquired(); auto iter = endpoints_.find(id); return iter != endpoints_.end() ? iter->second.get() : nullptr; } void MultiplexRouter::AssertLockAcquired() { #if DCHECK_IS_ON() if (lock_) lock_->AssertAcquired(); #endif } bool MultiplexRouter::InsertEndpointsForMessage(const Message& message) { if (!message.is_serialized()) return true; uint32_t num_ids = message.payload_num_interface_ids(); if (num_ids == 0) return true; const uint32_t* ids = message.payload_interface_ids(); MayAutoLock locker(&lock_); for (uint32_t i = 0; i < num_ids; ++i) { // Message header validation already ensures that the IDs are valid and not // the master ID. // The IDs are from the remote side and therefore their namespace bit is // supposed to be different than the value that this router would use. if (set_interface_id_namespace_bit_ == HasInterfaceIdNamespaceBitSet(ids[i])) { return false; } // It is possible that the endpoint already exists even when the remote side // is well-behaved: it might have notified us that the peer endpoint has // closed. bool inserted = false; InterfaceEndpoint* endpoint = FindOrInsertEndpoint(ids[i], &inserted); if (endpoint->closed() || endpoint->handle_created()) return false; } return true; } void MultiplexRouter::CloseEndpointsForMessage(const Message& message) { AssertLockAcquired(); if (!message.is_serialized()) return; uint32_t num_ids = message.payload_num_interface_ids(); if (num_ids == 0) return; const uint32_t* ids = message.payload_interface_ids(); for (uint32_t i = 0; i < num_ids; ++i) { InterfaceEndpoint* endpoint = FindEndpoint(ids[i]); // If the remote side maliciously sends the same interface ID in another // message which has been dispatched, we could get here with no endpoint // for the ID, a closed endpoint, or an endpoint with handle created. if (!endpoint || endpoint->closed() || endpoint->handle_created()) { RaiseErrorInNonTestingMode(); continue; } UpdateEndpointStateMayRemove(endpoint, ENDPOINT_CLOSED); MayAutoUnlock unlocker(&lock_); control_message_proxy_.NotifyPeerEndpointClosed(ids[i], base::nullopt); } ProcessTasks(NO_DIRECT_CLIENT_CALLS, nullptr); } } // namespace internal } // namespace mojo