// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/system/channel.h"
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/message_loop/message_loop.h"
#include "base/strings/stringprintf.h"
namespace mojo {
namespace system {
COMPILE_ASSERT(Channel::kBootstrapEndpointId !=
MessageInTransit::kInvalidEndpointId,
kBootstrapEndpointId_is_invalid);
STATIC_CONST_MEMBER_DEFINITION const MessageInTransit::EndpointId
Channel::kBootstrapEndpointId;
Channel::EndpointInfo::EndpointInfo() {
}
Channel::EndpointInfo::EndpointInfo(scoped_refptr<MessagePipe> message_pipe,
unsigned port)
: message_pipe(message_pipe),
port(port) {
}
Channel::EndpointInfo::~EndpointInfo() {
}
Channel::Channel()
: next_local_id_(kBootstrapEndpointId) {
}
bool Channel::Init(const PlatformChannelHandle& handle) {
DCHECK(creation_thread_checker_.CalledOnValidThread());
// No need to take |lock_|, since this must be called before this object
// becomes thread-safe.
DCHECK(!raw_channel_.get());
raw_channel_.reset(
RawChannel::Create(handle, this, base::MessageLoop::current()));
if (!raw_channel_->Init()) {
raw_channel_.reset();
return false;
}
return true;
}
void Channel::Shutdown() {
DCHECK(creation_thread_checker_.CalledOnValidThread());
base::AutoLock locker(lock_);
DCHECK(raw_channel_.get());
raw_channel_->Shutdown();
raw_channel_.reset();
// TODO(vtl): Should I clear |local_id_to_endpoint_info_map_|? Or assert that
// it's empty?
}
MessageInTransit::EndpointId Channel::AttachMessagePipeEndpoint(
scoped_refptr<MessagePipe> message_pipe, unsigned port) {
MessageInTransit::EndpointId local_id;
{
base::AutoLock locker(lock_);
while (next_local_id_ == MessageInTransit::kInvalidEndpointId ||
local_id_to_endpoint_info_map_.find(next_local_id_) !=
local_id_to_endpoint_info_map_.end())
next_local_id_++;
local_id = next_local_id_;
next_local_id_++;
// TODO(vtl): Use emplace when we move to C++11 unordered_maps. (It'll avoid
// some expensive reference count increment/decrements.) Once this is done,
// we should be able to delete |EndpointInfo|'s default constructor.
local_id_to_endpoint_info_map_[local_id] = EndpointInfo(message_pipe, port);
}
message_pipe->Attach(port, scoped_refptr<Channel>(this), local_id);
return local_id;
}
void Channel::RunMessagePipeEndpoint(MessageInTransit::EndpointId local_id,
MessageInTransit::EndpointId remote_id) {
EndpointInfo endpoint_info;
{
base::AutoLock locker(lock_);
IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.find(local_id);
CHECK(it != local_id_to_endpoint_info_map_.end());
endpoint_info = it->second;
}
endpoint_info.message_pipe->Run(endpoint_info.port, remote_id);
}
bool Channel::WriteMessage(MessageInTransit* message) {
base::AutoLock locker(lock_);
if (!raw_channel_.get()) {
// TODO(vtl): I think this is probably not an error condition, but I should
// think about it (and the shutdown sequence) more carefully.
LOG(INFO) << "WriteMessage() after shutdown";
return false;
}
return raw_channel_->WriteMessage(message);
}
void Channel::DetachMessagePipeEndpoint(MessageInTransit::EndpointId local_id) {
DCHECK_NE(local_id, MessageInTransit::kInvalidEndpointId);
base::AutoLock locker_(lock_);
local_id_to_endpoint_info_map_.erase(local_id);
}
Channel::~Channel() {
// The channel should have been shut down first.
DCHECK(!raw_channel_.get());
}
void Channel::OnReadMessage(const MessageInTransit& message) {
switch (message.type()) {
case MessageInTransit::kTypeMessagePipeEndpoint:
case MessageInTransit::kTypeMessagePipe:
OnReadMessageForDownstream(message);
break;
case MessageInTransit::TYPE_CHANNEL:
OnReadMessageForChannel(message);
break;
default:
HandleRemoteError(base::StringPrintf(
"Received message of invalid type %u",
static_cast<unsigned>(message.type())));
break;
}
}
void Channel::OnFatalError(FatalError fatal_error) {
// TODO(vtl): IMPORTANT. Notify all our endpoints that they're dead.
NOTIMPLEMENTED();
}
void Channel::OnReadMessageForDownstream(const MessageInTransit& message) {
DCHECK(message.type() == MessageInTransit::kTypeMessagePipeEndpoint ||
message.type() == MessageInTransit::kTypeMessagePipe);
MessageInTransit::EndpointId local_id = message.destination_id();
if (local_id == MessageInTransit::kInvalidEndpointId) {
HandleRemoteError("Received message with no destination ID");
return;
}
EndpointInfo endpoint_info;
{
base::AutoLock locker(lock_);
// Since we own |raw_channel_|, and this method and |Shutdown()| should only
// be called from the creation thread, |raw_channel_| should never be null
// here.
DCHECK(raw_channel_.get());
IdToEndpointInfoMap::const_iterator it =
local_id_to_endpoint_info_map_.find(local_id);
if (it == local_id_to_endpoint_info_map_.end()) {
HandleRemoteError(base::StringPrintf(
"Received a message for nonexistent local destination ID %u",
static_cast<unsigned>(local_id)));
return;
}
endpoint_info = it->second;
}
// We need to duplicate the message, because |EnqueueMessage()| will take
// ownership of it.
MessageInTransit* own_message = MessageInTransit::Create(
message.type(), message.subtype(), message.data(), message.data_size());
if (endpoint_info.message_pipe->EnqueueMessage(
MessagePipe::GetPeerPort(endpoint_info.port), own_message, NULL) !=
MOJO_RESULT_OK) {
HandleLocalError(base::StringPrintf(
"Failed to enqueue message to local destination ID %u",
static_cast<unsigned>(local_id)));
return;
}
}
void Channel::OnReadMessageForChannel(const MessageInTransit& message) {
// TODO(vtl): Currently no channel-only messages yet.
HandleRemoteError("Received invalid channel message");
NOTREACHED();
}
void Channel::HandleRemoteError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this?
LOG(INFO) << error_message;
}
void Channel::HandleLocalError(const base::StringPiece& error_message) {
// TODO(vtl): Is this how we really want to handle this?
LOG(FATAL) << error_message;
}
} // namespace system
} // namespace mojo