// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/system/raw_channel.h"
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include <algorithm>
#include <deque>
#include <vector>
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/compiler_specific.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop/message_loop.h"
#include "base/posix/eintr_wrapper.h"
#include "base/synchronization/lock.h"
#include "mojo/system/message_in_transit.h"
#include "mojo/system/platform_channel_handle.h"
namespace mojo {
namespace system {
namespace {
const size_t kReadSize = 4096;
class RawChannelPosix : public RawChannel,
public base::MessageLoopForIO::Watcher {
public:
RawChannelPosix(const PlatformChannelHandle& handle,
Delegate* delegate,
base::MessageLoop* message_loop);
virtual ~RawChannelPosix();
// |RawChannel| implementation:
virtual bool Init() OVERRIDE;
virtual void Shutdown() OVERRIDE;
virtual bool WriteMessage(MessageInTransit* message) OVERRIDE;
private:
// |base::MessageLoopForIO::Watcher| implementation:
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE;
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE;
// Watches for |fd_| to become writable. Must be called on the I/O thread.
void WaitToWrite();
// Calls |delegate()->OnFatalError(fatal_error)|. Must be called on the I/O
// thread WITHOUT |write_lock_| held.
void CallOnFatalError(Delegate::FatalError fatal_error);
// Writes the message at the front of |write_message_queue_|, starting at
// |write_message_offset_|. It removes and destroys if the write completes and
// otherwise updates |write_message_offset_|. Returns true on success. Must be
// called under |write_lock_|.
bool WriteFrontMessageNoLock();
// Cancels all pending writes and destroys the contents of
// |write_message_queue_|. Should only be called if |is_dead_| is false; sets
// |is_dead_| to true. Must be called under |write_lock_|.
void CancelPendingWritesNoLock();
base::MessageLoopForIO* message_loop_for_io() {
return static_cast<base::MessageLoopForIO*>(message_loop());
}
int fd_;
// Only used on the I/O thread:
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> read_watcher_;
scoped_ptr<base::MessageLoopForIO::FileDescriptorWatcher> write_watcher_;
// We store data from |read()|s in |read_buffer_|. The start of |read_buffer_|
// is always aligned with a message boundary (we will copy memory to ensure
// this), but |read_buffer_| may be larger than the actual number of bytes we
// have.
std::vector<char> read_buffer_;
size_t read_buffer_num_valid_bytes_;
base::Lock write_lock_; // Protects the following members.
bool is_dead_;
std::deque<MessageInTransit*> write_message_queue_;
size_t write_message_offset_;
// This is used for posting tasks from write threads to the I/O thread. It
// must only be accessed under |write_lock_|. The weak pointers it produces
// are only used/invalidated on the I/O thread.
base::WeakPtrFactory<RawChannelPosix> weak_ptr_factory_;
DISALLOW_COPY_AND_ASSIGN(RawChannelPosix);
};
RawChannelPosix::RawChannelPosix(const PlatformChannelHandle& handle,
Delegate* delegate,
base::MessageLoop* message_loop)
: RawChannel(delegate, message_loop),
fd_(handle.fd),
read_buffer_num_valid_bytes_(0),
is_dead_(false),
write_message_offset_(0),
weak_ptr_factory_(this) {
CHECK_EQ(RawChannel::message_loop()->type(), base::MessageLoop::TYPE_IO);
DCHECK_NE(fd_, -1);
}
RawChannelPosix::~RawChannelPosix() {
DCHECK(is_dead_);
DCHECK_EQ(fd_, -1);
// No need to take the |write_lock_| here -- if there are still weak pointers
// outstanding, then we're hosed anyway (since we wouldn't be able to
// invalidate them cleanly, since we might not be on the I/O thread).
DCHECK(!weak_ptr_factory_.HasWeakPtrs());
// These must have been shut down/destroyed on the I/O thread.
DCHECK(!read_watcher_.get());
DCHECK(!write_watcher_.get());
}
bool RawChannelPosix::Init() {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
DCHECK(!read_watcher_.get());
read_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
DCHECK(!write_watcher_.get());
write_watcher_.reset(new base::MessageLoopForIO::FileDescriptorWatcher());
// No need to take the lock. No one should be using us yet.
DCHECK(write_message_queue_.empty());
if (!message_loop_for_io()->WatchFileDescriptor(fd_, true,
base::MessageLoopForIO::WATCH_READ, read_watcher_.get(), this)) {
// TODO(vtl): I'm not sure |WatchFileDescriptor()| actually fails cleanly
// (in the sense of returning the message loop's state to what it was before
// it was called).
read_watcher_.reset();
write_watcher_.reset();
return false;
}
return true;
}
void RawChannelPosix::Shutdown() {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
base::AutoLock locker(write_lock_);
if (!is_dead_)
CancelPendingWritesNoLock();
DCHECK_NE(fd_, -1);
if (close(fd_) != 0)
PLOG(ERROR) << "close";
fd_ = -1;
weak_ptr_factory_.InvalidateWeakPtrs();
read_watcher_.reset(); // This will stop watching (if necessary).
write_watcher_.reset(); // This will stop watching (if necessary).
}
// Reminder: This must be thread-safe, and takes ownership of |message| on
// success.
bool RawChannelPosix::WriteMessage(MessageInTransit* message) {
base::AutoLock locker(write_lock_);
if (is_dead_) {
message->Destroy();
return false;
}
if (!write_message_queue_.empty()) {
write_message_queue_.push_back(message);
return true;
}
write_message_queue_.push_front(message);
DCHECK_EQ(write_message_offset_, 0u);
bool result = WriteFrontMessageNoLock();
DCHECK(result || write_message_queue_.empty());
if (!result) {
// Even if we're on the I/O thread, don't call |OnFatalError()| in the
// nested context.
message_loop()->PostTask(FROM_HERE,
base::Bind(&RawChannelPosix::CallOnFatalError,
weak_ptr_factory_.GetWeakPtr(),
Delegate::FATAL_ERROR_FAILED_WRITE));
} else if (!write_message_queue_.empty()) {
// Set up to wait for the FD to become writable. If we're not on the I/O
// thread, we have to post a task to do this.
if (base::MessageLoop::current() == message_loop()) {
WaitToWrite();
} else {
message_loop()->PostTask(FROM_HERE,
base::Bind(&RawChannelPosix::WaitToWrite,
weak_ptr_factory_.GetWeakPtr()));
}
}
return result;
}
void RawChannelPosix::OnFileCanReadWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_);
DCHECK_EQ(base::MessageLoop::current(), message_loop());
bool did_dispatch_message = false;
// Tracks the offset of the first undispatched message in |read_buffer_|.
// Currently, we copy data to ensure that this is zero at the beginning.
size_t read_buffer_start = 0;
for (;;) {
if (read_buffer_.size() - (read_buffer_start + read_buffer_num_valid_bytes_)
< kReadSize) {
// Use power-of-2 buffer sizes.
// TODO(vtl): Make sure the buffer doesn't get too large (and enforce the
// maximum message size to whatever extent necessary).
// TODO(vtl): We may often be able to peek at the header and get the real
// required extra space (which may be much bigger than |kReadSize|).
size_t new_size = std::max(read_buffer_.size(), kReadSize);
while (new_size <
read_buffer_start + read_buffer_num_valid_bytes_ + kReadSize)
new_size *= 2;
// TODO(vtl): It's suboptimal to zero out the fresh memory.
read_buffer_.resize(new_size, 0);
}
ssize_t bytes_read = HANDLE_EINTR(
read(fd_,
&read_buffer_[read_buffer_start + read_buffer_num_valid_bytes_],
kReadSize));
if (bytes_read < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
PLOG(ERROR) << "read";
{
base::AutoLock locker(write_lock_);
CancelPendingWritesNoLock();
}
CallOnFatalError(Delegate::FATAL_ERROR_FAILED_READ);
return;
}
break;
}
read_buffer_num_valid_bytes_ += static_cast<size_t>(bytes_read);
// Dispatch all the messages that we can.
while (read_buffer_num_valid_bytes_ >= sizeof(MessageInTransit)) {
const MessageInTransit* message =
reinterpret_cast<const MessageInTransit*>(
&read_buffer_[read_buffer_start]);
DCHECK_EQ(reinterpret_cast<size_t>(message) %
MessageInTransit::kMessageAlignment, 0u);
// If we have the header, not the whole message....
if (read_buffer_num_valid_bytes_ <
message->size_with_header_and_padding())
break;
// Dispatch the message.
delegate()->OnReadMessage(*message);
if (!read_watcher_.get()) {
// |Shutdown()| was called in |OnReadMessage()|.
// TODO(vtl): Add test for this case.
return;
}
did_dispatch_message = true;
// Update our state.
read_buffer_start += message->size_with_header_and_padding();
read_buffer_num_valid_bytes_ -= message->size_with_header_and_padding();
}
// If we dispatched any messages, stop reading for now (and let the message
// loop do its thing for another round).
// TODO(vtl): Is this the behavior we want? (Alternatives: i. Dispatch only
// a single message. Risks: slower, more complex if we want to avoid lots of
// copying. ii. Keep reading until there's no more data and dispatch all the
// messages we can. Risks: starvation of other users of the message loop.)
if (did_dispatch_message)
break;
// If we didn't max out |kReadSize|, stop reading for now.
if (static_cast<size_t>(bytes_read) < kReadSize)
break;
// Else try to read some more....
}
// Move data back to start.
if (read_buffer_start > 0) {
memmove(&read_buffer_[0], &read_buffer_[read_buffer_start],
read_buffer_num_valid_bytes_);
read_buffer_start = 0;
}
}
void RawChannelPosix::OnFileCanWriteWithoutBlocking(int fd) {
DCHECK_EQ(fd, fd_);
DCHECK_EQ(base::MessageLoop::current(), message_loop());
bool did_fail = false;
{
base::AutoLock locker(write_lock_);
DCHECK(!is_dead_);
DCHECK(!write_message_queue_.empty());
bool result = WriteFrontMessageNoLock();
DCHECK(result || write_message_queue_.empty());
if (!result)
did_fail = true;
else if (!write_message_queue_.empty())
WaitToWrite();
}
if (did_fail)
CallOnFatalError(Delegate::FATAL_ERROR_FAILED_WRITE);
}
void RawChannelPosix::WaitToWrite() {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
DCHECK(write_watcher_.get());
bool result = message_loop_for_io()->WatchFileDescriptor(
fd_, false, base::MessageLoopForIO::WATCH_WRITE, write_watcher_.get(),
this);
DCHECK(result);
}
void RawChannelPosix::CallOnFatalError(Delegate::FatalError fatal_error) {
DCHECK_EQ(base::MessageLoop::current(), message_loop());
// TODO(vtl): Add a "write_lock_.AssertNotAcquired()"?
delegate()->OnFatalError(fatal_error);
}
bool RawChannelPosix::WriteFrontMessageNoLock() {
write_lock_.AssertAcquired();
DCHECK(!is_dead_);
DCHECK(!write_message_queue_.empty());
MessageInTransit* message = write_message_queue_.front();
DCHECK_LT(write_message_offset_, message->size_with_header_and_padding());
size_t bytes_to_write =
message->size_with_header_and_padding() - write_message_offset_;
ssize_t bytes_written = HANDLE_EINTR(
write(fd_,
reinterpret_cast<char*>(message) + write_message_offset_,
bytes_to_write));
if (bytes_written < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
PLOG(ERROR) << "write of size " << bytes_to_write;
CancelPendingWritesNoLock();
return false;
}
// We simply failed to write since we'd block. The logic is the same as if
// we got a partial write.
bytes_written = 0;
}
DCHECK_GE(bytes_written, 0);
if (static_cast<size_t>(bytes_written) < bytes_to_write) {
// Partial (or no) write.
write_message_offset_ += static_cast<size_t>(bytes_written);
} else {
// Complete write.
DCHECK_EQ(static_cast<size_t>(bytes_written), bytes_to_write);
write_message_queue_.pop_front();
write_message_offset_ = 0;
message->Destroy();
}
return true;
}
void RawChannelPosix::CancelPendingWritesNoLock() {
write_lock_.AssertAcquired();
DCHECK(!is_dead_);
is_dead_ = true;
for (std::deque<MessageInTransit*>::iterator it =
write_message_queue_.begin(); it != write_message_queue_.end();
++it) {
(*it)->Destroy();
}
write_message_queue_.clear();
}
} // namespace
// -----------------------------------------------------------------------------
// Static factory method declared in raw_channel.h.
// static
RawChannel* RawChannel::Create(const PlatformChannelHandle& handle,
Delegate* delegate,
base::MessageLoop* message_loop) {
return new RawChannelPosix(handle, delegate, message_loop);
}
} // namespace system
} // namespace mojo