// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/system/local_message_pipe_endpoint.h"
#include <string.h>
#include "base/logging.h"
#include "mojo/system/dispatcher.h"
#include "mojo/system/message_in_transit.h"
namespace mojo {
namespace system {
LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry()
: message(NULL) {
}
// See comment in header file.
LocalMessagePipeEndpoint::MessageQueueEntry::MessageQueueEntry(
const MessageQueueEntry& other)
: message(NULL) {
DCHECK(!other.message);
DCHECK(other.dispatchers.empty());
}
LocalMessagePipeEndpoint::MessageQueueEntry::~MessageQueueEntry() {
if (message)
message->Destroy();
// Close all the dispatchers.
for (size_t i = 0; i < dispatchers.size(); i++) {
// Note: Taking the |Dispatcher| locks is okay, since no one else should
// have a reference to the dispatchers (and the locks shouldn't be held).
DCHECK(dispatchers[i]->HasOneRef());
dispatchers[i]->Close();
}
}
LocalMessagePipeEndpoint::LocalMessagePipeEndpoint()
: is_open_(true),
is_peer_open_(true) {
}
LocalMessagePipeEndpoint::~LocalMessagePipeEndpoint() {
DCHECK(!is_open_);
}
void LocalMessagePipeEndpoint::Close() {
DCHECK(is_open_);
is_open_ = false;
message_queue_.clear();
}
bool LocalMessagePipeEndpoint::OnPeerClose() {
DCHECK(is_open_);
DCHECK(is_peer_open_);
MojoWaitFlags old_satisfied_flags = SatisfiedFlags();
MojoWaitFlags old_satisfiable_flags = SatisfiableFlags();
is_peer_open_ = false;
MojoWaitFlags new_satisfied_flags = SatisfiedFlags();
MojoWaitFlags new_satisfiable_flags = SatisfiableFlags();
if (new_satisfied_flags != old_satisfied_flags ||
new_satisfiable_flags != old_satisfiable_flags) {
waiter_list_.AwakeWaitersForStateChange(new_satisfied_flags,
new_satisfiable_flags);
}
return true;
}
MojoResult LocalMessagePipeEndpoint::CanEnqueueMessage(
const MessageInTransit* /*message*/,
const std::vector<Dispatcher*>* /*dispatchers*/) {
return MOJO_RESULT_OK;
}
void LocalMessagePipeEndpoint::EnqueueMessage(
MessageInTransit* message,
std::vector<scoped_refptr<Dispatcher> >* dispatchers) {
DCHECK(is_open_);
DCHECK(is_peer_open_);
bool was_empty = message_queue_.empty();
message_queue_.push_back(MessageQueueEntry());
message_queue_.back().message = message;
if (dispatchers) {
#ifndef NDEBUG
// It's important that we're taking "ownership" of the dispatchers. In
// particular, they must not be in the global handle table (i.e., have live
// handles referring to them). If we need to destroy any queued messages, we
// need to know that any handles in them should be closed.
for (size_t i = 0; i < dispatchers->size(); i++)
DCHECK((*dispatchers)[i]->HasOneRef());
#endif
message_queue_.back().dispatchers.swap(*dispatchers);
}
if (was_empty) {
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
SatisfiableFlags());
}
}
void LocalMessagePipeEndpoint::CancelAllWaiters() {
DCHECK(is_open_);
waiter_list_.CancelAllWaiters();
}
MojoResult LocalMessagePipeEndpoint::ReadMessage(
void* bytes, uint32_t* num_bytes,
std::vector<scoped_refptr<Dispatcher> >* dispatchers,
uint32_t* num_dispatchers,
MojoReadMessageFlags flags) {
DCHECK(is_open_);
DCHECK(!dispatchers || dispatchers->empty());
const uint32_t max_bytes = num_bytes ? *num_bytes : 0;
const uint32_t max_num_dispatchers = num_dispatchers ? *num_dispatchers : 0;
if (message_queue_.empty()) {
return is_peer_open_ ? MOJO_RESULT_NOT_FOUND :
MOJO_RESULT_FAILED_PRECONDITION;
}
// TODO(vtl): If |flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD|, we could pop
// and release the lock immediately.
bool enough_space = true;
const MessageInTransit* queued_message = message_queue_.front().message;
if (num_bytes)
*num_bytes = queued_message->data_size();
if (queued_message->data_size() <= max_bytes)
memcpy(bytes, queued_message->data(), queued_message->data_size());
else
enough_space = false;
std::vector<scoped_refptr<Dispatcher> >* queued_dispatchers =
&message_queue_.front().dispatchers;
if (num_dispatchers)
*num_dispatchers = static_cast<uint32_t>(queued_dispatchers->size());
if (enough_space) {
if (queued_dispatchers->empty()) {
// Nothing to do.
} else if (queued_dispatchers->size() <= max_num_dispatchers) {
DCHECK(dispatchers);
dispatchers->swap(*queued_dispatchers);
} else {
enough_space = false;
}
}
if (enough_space || (flags & MOJO_READ_MESSAGE_FLAG_MAY_DISCARD)) {
message_queue_.pop_front();
// Now it's empty, thus no longer readable.
if (message_queue_.empty()) {
// It's currently not possible to wait for non-readability, but we should
// do the state change anyway.
waiter_list_.AwakeWaitersForStateChange(SatisfiedFlags(),
SatisfiableFlags());
}
}
if (!enough_space)
return MOJO_RESULT_RESOURCE_EXHAUSTED;
return MOJO_RESULT_OK;
}
MojoResult LocalMessagePipeEndpoint::AddWaiter(Waiter* waiter,
MojoWaitFlags flags,
MojoResult wake_result) {
DCHECK(is_open_);
if ((flags & SatisfiedFlags()))
return MOJO_RESULT_ALREADY_EXISTS;
if (!(flags & SatisfiableFlags()))
return MOJO_RESULT_FAILED_PRECONDITION;
waiter_list_.AddWaiter(waiter, flags, wake_result);
return MOJO_RESULT_OK;
}
void LocalMessagePipeEndpoint::RemoveWaiter(Waiter* waiter) {
DCHECK(is_open_);
waiter_list_.RemoveWaiter(waiter);
}
MojoWaitFlags LocalMessagePipeEndpoint::SatisfiedFlags() {
MojoWaitFlags satisfied_flags = 0;
if (!message_queue_.empty())
satisfied_flags |= MOJO_WAIT_FLAG_READABLE;
if (is_peer_open_)
satisfied_flags |= MOJO_WAIT_FLAG_WRITABLE;
return satisfied_flags;
}
MojoWaitFlags LocalMessagePipeEndpoint::SatisfiableFlags() {
MojoWaitFlags satisfiable_flags = 0;
if (!message_queue_.empty() || is_peer_open_)
satisfiable_flags |= MOJO_WAIT_FLAG_READABLE;
if (is_peer_open_)
satisfiable_flags |= MOJO_WAIT_FLAG_WRITABLE;
return satisfiable_flags;
}
} // namespace system
} // namespace mojo