// Copyright 2013 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "mojo/system/local_message_pipe_endpoint.h" #include <string.h> #include "base/logging.h" #include "mojo/system/dispatcher.h" #include "mojo/system/message_in_transit.h" namespace mojo { namespace system { LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry() : message(NULL) { } // See comment in header file. LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry( const MessageQueueEntry& other) : message(NULL) { DCHECK(!other.message); DCHECK(other.dispatchers.empty()); } LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() { if (message) message->Destroy(); // Close all the dispatchers. for (size_t i = 0; i < dispatchers.size(); i++) { // Note: Taking the |Dispatcher| locks is okay, since no one else should // have a reference to the dispatchers (and the locks shouldn't be held). DCHECK(dispatchers[i]->HasOneRef()); dispatchers[i]->Close(); } } LocalMessagePipeEndpoint::LocalMessagePipeEndpoint() : is_open_(true), is_peer_open_(true) { } LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() { DCHECK(!is_open_); } void LocalMessagePipeEndpoint::Close() { DCHECK(is_open_); is_open_ = false; message_queue_.clear(); } bool LocalMessagePipeEndpoint::OnPeerClose() { DCHECK(is_open_); DCHECK(is_peer_open_); MojoWaitFlags old_satisfied_flags = SatisfiedFlags(); MojoWaitFlags old_satisfiable_flags = SatisfiableFlags(); is_peer_open_ = false; MojoWaitFlags new_satisfied_flags = SatisfiedFlags(); MojoWaitFlags new_satisfiable_flags = SatisfiableFlags(); if (new_satisfied_flags != old_satisfied_flags || new_satisfiable_flags != old_satisfiable_flags) { waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags, new_satisfiable_flags); } return true; } MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage( const MessageInTransit* /*message*/, const std::vector<Dispatcher*>* /*dispatchers*/) { return MOJO_RESULT_OK; } void LocalMessagePipeEndpoint::EnqueueMessage( MessageInTransit* message, std::vector<scoped_refptr<Dispatcher> >* dispatchers) { DCHECK(is_open_); DCHECK(is_peer_open_); bool was_empty = message_queue_.empty(); message_queue_.push_back(MessageQueueEntry()); message_queue_.back().message = message; if (dispatchers) { #ifndef NDEBUG // It's important that we're taking "ownership" of the dispatchers. In // particular, they must not be in the global handle table (i.e., have live // handles referring to them). If we need to destroy any queued messages, we // need to know that any handles in them should be closed. for (size_t i = 0; i < dispatchers->size(); i++) DCHECK((*dispatchers)[i]->HasOneRef()); #endif message_queue_.back().dispatchers.swap(*dispatchers); } if (was_empty) { waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), SatisfiableFlags()); } } void LocalMessagePipeEndpoint::CancelAllWaiters() { DCHECK(is_open_); waiter_list_.CancelAllWaiters(); } MojoResult LocalMessagePipeEndpoint::ReadMessage( void* bytes, uint32_t* num_bytes, std::vector<scoped_refptr<Dispatcher> >* dispatchers, uint32_t* num_dispatchers, MojoReadMessageFlags flags) { DCHECK(is_open_); DCHECK(!dispatchers || dispatchers->empty()); const uint32_t max_bytes = num_bytes ? *num_bytes : 0; const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0; if (message_queue_.empty()) { return is_peer_open_ ? MOJO_RESULT_NOT_FOUND : MOJO_RESULT_FAILED_PRECONDITION; } // TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop // and release the lock immediately. bool enough_space = true; const MessageInTransit* queued_message = message_queue_.front().message; if (num_bytes) *num_bytes = queued_message->data_size(); if (queued_message->data_size() <= max_bytes) memcpy(bytes, queued_message->data(), queued_message->data_size()); else enough_space = false; std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers = &message_queue_.front().dispatchers; if (num_dispatchers) *num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size()); if (enough_space) { if (queued_dispatchers->empty()) { // Nothing to do. } else if (queued_dispatchers->size() <= max_num_dispatchers) { DCHECK(dispatchers); dispatchers->swap(*queued_dispatchers); } else { enough_space = false; } } if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) { message_queue_.pop_front(); // Now it's empty, thus no longer readable. if (message_queue_.empty()) { // It's currently not possible to wait for non-readability, but we should // do the state change anyway. waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(), SatisfiableFlags()); } } if (!enough_space) return MOJO_RESULT_RESOURCE_EXHAUSTED; return MOJO_RESULT_OK; } MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter, MojoWaitFlags flags, MojoResult wake_result) { DCHECK(is_open_); if ((flags & SatisfiedFlags())) return MOJO_RESULT_ALREADY_EXISTS; if (!(flags & SatisfiableFlags())) return MOJO_RESULT_FAILED_PRECONDITION; waiter_list_.AddWaiter(waiter, flags, wake_result); return MOJO_RESULT_OK; } void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) { DCHECK(is_open_); waiter_list_.RemoveWaiter(waiter); } MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() { MojoWaitFlags satisfied_flags = 0; if (!message_queue_.empty()) satisfied_flags |= MOJO_WAIT_FLAG_READABLE; if (is_peer_open_) satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE; return satisfied_flags; } MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() { MojoWaitFlags satisfiable_flags = 0; if (!message_queue_.empty() || is_peer_open_) satisfiable_flags |= MOJO_WAIT_FLAG_READABLE; if (is_peer_open_) satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE; return satisfiable_flags; } } // namespace system } // namespace mojo