普通文本  |  179行  |  4.64 KB

// Copyright 2013 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/public/bindings/lib/connector.h"

#include <assert.h>
#include <stdlib.h>

#include <algorithm>

namespace mojo {
namespace internal {

// ----------------------------------------------------------------------------

Connector::Connector(ScopedMessagePipeHandle message_pipe)
    : message_pipe_(message_pipe.Pass()),
      incoming_receiver_(NULL),
      error_(false) {
}

Connector::~Connector() {
}

void Connector::SetIncomingReceiver(MessageReceiver* receiver) {
  assert(!incoming_receiver_);
  incoming_receiver_ = receiver;
  if (incoming_receiver_)
    WaitToReadMore();
}

bool Connector::Accept(Message* message) {
  if (error_)
    return false;

  bool wait_to_write;
  WriteOne(message, &wait_to_write);

  if (wait_to_write) {
    WaitToWriteMore();
    if (!error_)
      write_queue_.Push(message);
  }

  return !error_;
}

void Connector::OnHandleReady(Callback* callback, MojoResult result) {
  if (callback == &read_callback_)
    ReadMore();
  if (callback == &write_callback_)
    WriteMore();
}

void Connector::WaitToReadMore() {
  read_callback_.SetOwnerToNotify(this);
  read_callback_.SetAsyncWaitID(
      BindingsSupport::Get()->AsyncWait(message_pipe_.get(),
                                        MOJO_WAIT_FLAG_READABLE,
                                        &read_callback_));
}

void Connector::WaitToWriteMore() {
  write_callback_.SetOwnerToNotify(this);
  write_callback_.SetAsyncWaitID(
      BindingsSupport::Get()->AsyncWait(message_pipe_.get(),
                                        MOJO_WAIT_FLAG_WRITABLE,
                                        &write_callback_));
}

void Connector::ReadMore() {
  for (;;) {
    MojoResult rv;

    uint32_t num_bytes = 0, num_handles = 0;
    rv = ReadMessageRaw(message_pipe_.get(),
                        NULL,
                        &num_bytes,
                        NULL,
                        &num_handles,
                        MOJO_READ_MESSAGE_FLAG_NONE);
    if (rv == MOJO_RESULT_NOT_FOUND) {
      WaitToReadMore();
      break;
    }
    if (rv != MOJO_RESULT_RESOURCE_EXHAUSTED) {
      error_ = true;
      break;
    }

    Message message;
    message.data = static_cast<MessageData*>(malloc(num_bytes));
    message.handles.resize(num_handles);

    rv = ReadMessageRaw(message_pipe_.get(),
                        message.data,
                        &num_bytes,
                        message.handles.empty() ? NULL :
                            reinterpret_cast<MojoHandle*>(&message.handles[0]),
                        &num_handles,
                        MOJO_READ_MESSAGE_FLAG_NONE);
    if (rv != MOJO_RESULT_OK) {
      error_ = true;
      break;
    }

    incoming_receiver_->Accept(&message);
  }
}

void Connector::WriteMore() {
  while (!error_ && !write_queue_.IsEmpty()) {
    Message* message = write_queue_.Peek();

    bool wait_to_write;
    WriteOne(message, &wait_to_write);
    if (wait_to_write)
      break;

    write_queue_.Pop();
  }
}

void Connector::WriteOne(Message* message, bool* wait_to_write) {
  // TODO(darin): WriteMessageRaw will eventually start generating an error that
  // it cannot accept more data. In that case, we'll need to wait on the pipe
  // to determine when we can try writing again. This flag will be set to true
  // in that case.
  *wait_to_write = false;

  MojoResult rv = WriteMessageRaw(
      message_pipe_.get(),
      message->data,
      message->data->header.num_bytes,
      message->handles.empty() ? NULL :
          reinterpret_cast<const MojoHandle*>(&message->handles[0]),
      static_cast<uint32_t>(message->handles.size()),
      MOJO_WRITE_MESSAGE_FLAG_NONE);
  if (rv == MOJO_RESULT_OK) {
    // The handles were successfully transferred, so we don't need the message
    // to track their lifetime any longer.
    message->handles.clear();
  } else {
    error_ = true;
  }
}

// ----------------------------------------------------------------------------

Connector::Callback::Callback()
    : owner_(NULL),
      async_wait_id_(0) {
}

Connector::Callback::~Callback() {
  if (owner_)
    BindingsSupport::Get()->CancelWait(async_wait_id_);
}

void Connector::Callback::SetOwnerToNotify(Connector* owner) {
  assert(!owner_);
  owner_ = owner;
}

void Connector::Callback::SetAsyncWaitID(BindingsSupport::AsyncWaitID id) {
  async_wait_id_ = id;
}

void Connector::Callback::OnHandleReady(MojoResult result) {
  assert(owner_);
  Connector* owner = NULL;
  std::swap(owner, owner_);
  owner->OnHandleReady(this, result);
}

}  // namespace internal
}  // namespace mojo