普通文本  |  269行  |  8.18 KB

// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "lib/fidl/cpp/internal/message_reader.h"

#include <lib/async/default.h>
#include <lib/fidl/cpp/message_buffer.h>
#include <lib/fidl/epitaph.h>
#include <zircon/assert.h>

namespace fidl {
namespace internal {
namespace {

constexpr zx_signals_t kSignals = ZX_CHANNEL_READABLE | ZX_CHANNEL_PEER_CLOSED;

// |Canary| is a stack-allocated object that observes when a |MessageReader| is
// destroyed or unbound from the current channel.
//
// Because |WaitAndDispatchOneMessageUntil| can be called re-entrantly, we can
// be in a state where there are N nested calls to |ReadAndDispatchMessage| on
// the stack. While dispatching any of those messages, the client can destroy
// the |MessageReader| or unbind it from the current channel. When that happens
// we need to stop reading messages from the channel and unwind the stack
// safely.
//
// The |Canary| works by storing a pointer to its |should_stop_| field in the
// |MessageReader|.  Upon destruction or unbinding, the |MessageReader| writes
// |true| into |should_stop_|. When we unwind the stack, the |Canary| forwards
// that value to the next |Canary| on the stack.
class Canary {
 public:
  explicit Canary(bool** should_stop_slot)
      : should_stop_slot_(should_stop_slot),
        previous_should_stop_(*should_stop_slot_),
        should_stop_(false) {
    *should_stop_slot_ = &should_stop_;
  }

  ~Canary() {
    if (should_stop_) {
      // If we should stop, we need to propagate that information to the
      // |Canary| higher up the stack, if any. We also cannot touch
      // |*should_stop_slot_| because the |MessageReader| might have been
      // destroyed (or bound to another channel).
      if (previous_should_stop_)
        *previous_should_stop_ = should_stop_;
    } else {
      // Otherwise, the |MessageReader| was not destroyed and is still bound to
      // the same channel. We need to restore the previous |should_stop_|
      // pointer so that a |Canary| further up the stack can still be informed
      // about whether to stop.
      *should_stop_slot_ = previous_should_stop_;
    }
  }

  // Whether the |ReadAndDispatchMessage| that created the |Canary| should stop
  // after dispatching the current message.
  bool should_stop() const { return should_stop_; }

 private:
  bool** should_stop_slot_;
  bool* previous_should_stop_;
  bool should_stop_;
};

}  // namespace

static_assert(std::is_standard_layout<MessageReader>::value,
              "We need offsetof to work");

MessageReader::MessageReader(MessageHandler* message_handler)
    : wait_{{ASYNC_STATE_INIT},
            &MessageReader::CallHandler,
            ZX_HANDLE_INVALID,
            kSignals},
      dispatcher_(nullptr),
      should_stop_(nullptr),
      message_handler_(message_handler),
      error_handler_(nullptr) {}

MessageReader::~MessageReader() {
  Stop();
  if (dispatcher_)
    async_cancel_wait(dispatcher_, &wait_);
}

zx_status_t MessageReader::Bind(zx::channel channel,
                                async_dispatcher_t* dispatcher) {
  if (is_bound())
    Unbind();
  if (!channel)
    return ZX_OK;
  channel_ = std::move(channel);
  if (dispatcher) {
    dispatcher_ = dispatcher;
  } else {
    dispatcher_ = async_get_default_dispatcher();
  }
  ZX_ASSERT_MSG(dispatcher_ != nullptr,
                "either |dispatcher| must be non-null, or "
                "|async_get_default_dispatcher| must "
                "be configured to return a non-null value");
  wait_.object = channel_.get();
  zx_status_t status = async_begin_wait(dispatcher_, &wait_);
  if (status != ZX_OK)
    Unbind();
  return status;
}

zx::channel MessageReader::Unbind() {
  if (!is_bound())
    return zx::channel();
  Stop();
  async_cancel_wait(dispatcher_, &wait_);
  wait_.object = ZX_HANDLE_INVALID;
  dispatcher_ = nullptr;
  zx::channel channel = std::move(channel_);
  if (message_handler_)
    message_handler_->OnChannelGone();
  return channel;
}

void MessageReader::Reset() {
  Unbind();
  error_handler_ = nullptr;
}

zx_status_t MessageReader::TakeChannelAndErrorHandlerFrom(
    MessageReader* other) {
  zx_status_t status = Bind(other->Unbind(), other->dispatcher_);
  if (status != ZX_OK)
    return status;
  error_handler_ = std::move(other->error_handler_);
  return ZX_OK;
}

zx_status_t MessageReader::WaitAndDispatchOneMessageUntil(zx::time deadline) {
  if (!is_bound())
    return ZX_ERR_BAD_STATE;
  zx_signals_t pending = ZX_SIGNAL_NONE;
  zx_status_t status = channel_.wait_one(kSignals, deadline, &pending);
  if (status == ZX_ERR_TIMED_OUT)
    return status;
  if (status != ZX_OK) {
    NotifyError(status);
    return status;
  }

  if (pending & ZX_CHANNEL_READABLE) {
    MessageBuffer buffer;
    return ReadAndDispatchMessage(&buffer);
  }

  ZX_DEBUG_ASSERT(pending & ZX_CHANNEL_PEER_CLOSED);
  NotifyError(ZX_ERR_PEER_CLOSED);
  return ZX_ERR_PEER_CLOSED;
}

void MessageReader::CallHandler(async_dispatcher_t* dispatcher,
                                async_wait_t* wait, zx_status_t status,
                                const zx_packet_signal_t* signal) {
  static_assert(offsetof(MessageReader, wait_) == 0,
                "The wait must be the first member for this cast to be valid.");
  reinterpret_cast<MessageReader*>(wait)->OnHandleReady(dispatcher, status,
                                                        signal);
}

void MessageReader::OnHandleReady(async_dispatcher_t* dispatcher,
                                  zx_status_t status,
                                  const zx_packet_signal_t* signal) {
  if (status != ZX_OK) {
    NotifyError(status);
    return;
  }

  if (signal->observed & ZX_CHANNEL_READABLE) {
    MessageBuffer buffer;
    for (uint64_t i = 0; i < signal->count; i++) {
      status = ReadAndDispatchMessage(&buffer);
      // If ReadAndDispatchMessage returns ZX_ERR_STOP, that means the message
      // handler has destroyed this object and we need to unwind without
      // touching |this|.
      if (status == ZX_ERR_SHOULD_WAIT)
        break;
      if (status != ZX_OK)
        return;
    }
    status = async_begin_wait(dispatcher, &wait_);
    if (status != ZX_OK) {
      NotifyError(status);
    }
    return;
  }

  ZX_DEBUG_ASSERT(signal->observed & ZX_CHANNEL_PEER_CLOSED);
  // Notice that we don't notify an error until we've drained all the messages
  // out of the channel.
  NotifyError(ZX_ERR_PEER_CLOSED);
}

zx_status_t MessageReader::ReadAndDispatchMessage(MessageBuffer* buffer) {
  Message message = buffer->CreateEmptyMessage();
  zx_status_t status = message.Read(channel_.get(), 0);
  if (status == ZX_ERR_SHOULD_WAIT)
    return status;
  if (status != ZX_OK) {
    NotifyError(status);
    return status;
  }

  if (message.has_header() && message.ordinal() == FIDL_EPITAPH_ORDINAL) {
    // This indicates the message is an epitaph, and that any epitaph-friendly
    // error handlers should be invoked.  Note the epitaph error is stored in
    // the header's reserved word.

    // TODO(FIDL-322): Use a different error code to distinguish remote encoding
    // errors from local ones.
    if (message.bytes().actual() != sizeof(fidl_epitaph_t)) {
      NotifyError(ZX_ERR_INVALID_ARGS);
      return ZX_ERR_INVALID_ARGS;
    }
    fidl_epitaph_t* epitaph = message.GetBytesAs<fidl_epitaph_t>();
    NotifyError(epitaph->hdr.reserved0);
    return ZX_ERR_PEER_CLOSED;
  }

  if (!message_handler_)
    return ZX_OK;
  Canary canary(&should_stop_);
  status = message_handler_->OnMessage(std::move(message));
  if (canary.should_stop())
    return ZX_ERR_STOP;
  if (status != ZX_OK)
    NotifyError(status);
  return status;
}

zx_status_t MessageReader::Close(zx_status_t epitaph_value) {
  if (!is_bound()) {
    return ZX_ERR_BAD_STATE;
  }

  zx_status_t status = fidl_epitaph_write(channel_.get(), epitaph_value);
  if (status != ZX_OK) {
    return status;
  }
  Unbind();
  return ZX_OK;
}

void MessageReader::NotifyError(zx_status_t epitaph_value) {
  Unbind();
  if (error_handler_) {
    error_handler_(epitaph_value);
  }
}

void MessageReader::Stop() {
  if (should_stop_) {
    *should_stop_ = true;
    should_stop_ = nullptr;
  }
}

}  // namespace internal
}  // namespace fidl