普通文本  |  145行  |  4.21 KB

// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "ipc/mojo/ipc_message_pipe_reader.h"

#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/location.h"
#include "base/logging.h"
#include "base/message_loop/message_loop_proxy.h"
#include "mojo/public/cpp/environment/environment.h"

namespace IPC {
namespace internal {

MessagePipeReader::MessagePipeReader(mojo::ScopedMessagePipeHandle handle)
    : pipe_wait_id_(0),
      pipe_(handle.Pass()) {
  StartWaiting();
}

MessagePipeReader::~MessagePipeReader() {
  CHECK(!IsValid());
}

void MessagePipeReader::Close() {
  StopWaiting();
  pipe_.reset();
  OnPipeClosed();
}

void MessagePipeReader::CloseWithError(MojoResult error) {
  OnPipeError(error);
  Close();
}

// static
void MessagePipeReader::InvokePipeIsReady(void* closure, MojoResult result) {
  reinterpret_cast<MessagePipeReader*>(closure)->PipeIsReady(result);
}

void MessagePipeReader::StartWaiting() {
  DCHECK(pipe_.is_valid());
  DCHECK(!pipe_wait_id_);
  // Not using MOJO_HANDLE_SIGNAL_WRITABLE here, expecting buffer in
  // MessagePipe.
  //
  // TODO(morrita): Should we re-set the signal when we get new
  // message to send?
  pipe_wait_id_ = mojo::Environment::GetDefaultAsyncWaiter()->AsyncWait(
      pipe_.get().value(),
      MOJO_HANDLE_SIGNAL_READABLE,
      MOJO_DEADLINE_INDEFINITE,
      &InvokePipeIsReady,
      this);
}

void MessagePipeReader::StopWaiting() {
  if (!pipe_wait_id_)
    return;
  mojo::Environment::GetDefaultAsyncWaiter()->CancelWait(pipe_wait_id_);
  pipe_wait_id_ = 0;
}

void MessagePipeReader::PipeIsReady(MojoResult wait_result) {
  pipe_wait_id_ = 0;

  if (wait_result != MOJO_RESULT_OK) {
    if (wait_result != MOJO_RESULT_ABORTED) {
      // FAILED_PRECONDITION happens every time the peer is dead so
      // it isn't worth polluting the log message.
      DLOG_IF(WARNING, wait_result != MOJO_RESULT_FAILED_PRECONDITION)
          << "Pipe got error from the waiter. Closing: "
          << wait_result;
      OnPipeError(wait_result);
    }

    Close();
    return;
  }

  while (pipe_.is_valid()) {
    MojoResult read_result = ReadMessageBytes();
    if (read_result == MOJO_RESULT_SHOULD_WAIT)
      break;
    if (read_result != MOJO_RESULT_OK) {
      // FAILED_PRECONDITION means that all the received messages
      // got consumed and the peer is already closed.
      if (read_result != MOJO_RESULT_FAILED_PRECONDITION) {
        DLOG(WARNING)
            << "Pipe got error from ReadMessage(). Closing: " << read_result;
        OnPipeError(read_result);
      }

      Close();
      break;
    }

    OnMessageReceived();
  }

  if (pipe_.is_valid())
    StartWaiting();
}

MojoResult MessagePipeReader::ReadMessageBytes() {
  DCHECK(handle_buffer_.empty());

  uint32_t num_bytes = static_cast<uint32_t>(data_buffer_.size());
  uint32_t num_handles = 0;
  MojoResult result = MojoReadMessage(pipe_.get().value(),
                                      num_bytes ? &data_buffer_[0] : NULL,
                                      &num_bytes,
                                      NULL,
                                      &num_handles,
                                      MOJO_READ_MESSAGE_FLAG_NONE);
  data_buffer_.resize(num_bytes);
  handle_buffer_.resize(num_handles);
  if (result == MOJO_RESULT_RESOURCE_EXHAUSTED) {
    // MOJO_RESULT_RESOURCE_EXHAUSTED was asking the caller that
    // it needs more bufer. So we re-read it with resized buffers.
    result = MojoReadMessage(pipe_.get().value(),
                             num_bytes ? &data_buffer_[0] : NULL,
                             &num_bytes,
                             num_handles ? &handle_buffer_[0] : NULL,
                             &num_handles,
                             MOJO_READ_MESSAGE_FLAG_NONE);
  }

  DCHECK(0 == num_bytes || data_buffer_.size() == num_bytes);
  DCHECK(0 == num_handles || handle_buffer_.size() == num_handles);
  return result;
}

void MessagePipeReader::DelayedDeleter::operator()(
    MessagePipeReader* ptr) const {
  ptr->Close();
  base::MessageLoopProxy::current()->PostTask(
      FROM_HERE, base::Bind(&DeleteNow, ptr));
}

}  // namespace internal
}  // namespace IPC