普通文本  |  139行  |  4.37 KB

// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/public/cpp/system/string_data_pipe_producer.h"

#include <algorithm>

#include "base/bind.h"
#include "base/callback.h"
#include "base/location.h"
#include "base/task_scheduler/post_task.h"

namespace mojo {

namespace {

// Attempts to write data to a producer handle. Outputs the actual number of
// bytes written in |*size|, and returns a result indicating the status of the
// last attempted write operation.
MojoResult WriteDataToProducerHandle(DataPipeProducerHandle producer,
                                     const char* data,
                                     size_t* size) {
  void* dest;
  uint32_t bytes_left = static_cast<uint32_t>(*size);

  // We loop here since the pipe's available capacity may be larger than its
  // *contiguous* capacity, and hence two independent consecutive two-phase
  // writes may succeed. The goal here is to write as much of |data| as possible
  // until we either run out of data or run out of capacity.
  MojoResult result;
  do {
    uint32_t capacity = bytes_left;
    result =
        producer.BeginWriteData(&dest, &capacity, MOJO_WRITE_DATA_FLAG_NONE);
    if (result == MOJO_RESULT_SHOULD_WAIT) {
      result = MOJO_RESULT_OK;
      break;
    } else if (result != MOJO_RESULT_OK) {
      break;
    }

    capacity = std::min(capacity, bytes_left);
    memcpy(dest, data, capacity);
    MojoResult end_result = producer.EndWriteData(capacity);
    DCHECK_EQ(MOJO_RESULT_OK, end_result);

    data += capacity;
    bytes_left -= capacity;
  } while (bytes_left);

  *size -= bytes_left;
  return result;
}

}  // namespace

StringDataPipeProducer::StringDataPipeProducer(
    ScopedDataPipeProducerHandle producer)
    : producer_(std::move(producer)),
      watcher_(FROM_HERE,
               SimpleWatcher::ArmingPolicy::AUTOMATIC,
               base::SequencedTaskRunnerHandle::Get()),
      weak_factory_(this) {}

StringDataPipeProducer::~StringDataPipeProducer() = default;

void StringDataPipeProducer::Write(const base::StringPiece& data,
                                   AsyncWritingMode mode,
                                   CompletionCallback callback) {
  DCHECK(!callback_);
  callback_ = std::move(callback);

  // Immediately attempt to write data without making an extra copy. If we can
  // get it all in one shot, we're done aleady.
  size_t size = data.size();
  MojoResult result =
      WriteDataToProducerHandle(producer_.get(), data.data(), &size);
  if (result == MOJO_RESULT_OK && size == data.size()) {
    base::SequencedTaskRunnerHandle::Get()->PostTask(
        FROM_HERE, base::BindOnce(&StringDataPipeProducer::InvokeCallback,
                                  weak_factory_.GetWeakPtr(), MOJO_RESULT_OK));
  } else {
    // Copy whatever data didn't make the cut and try again when the pipe has
    // some more capacity.
    if (mode == AsyncWritingMode::STRING_MAY_BE_INVALIDATED_BEFORE_COMPLETION) {
      data_ = std::string(data.data() + size, data.size() - size);
      data_view_ = data_;
    } else {
      data_view_ = base::StringPiece(data.data() + size, data.size() - size);
    }
    watcher_.Watch(producer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
                   MOJO_WATCH_CONDITION_SATISFIED,
                   base::Bind(&StringDataPipeProducer::OnProducerHandleReady,
                              base::Unretained(this)));
  }
}

void StringDataPipeProducer::InvokeCallback(MojoResult result) {
  // May delete |this|.
  std::move(callback_).Run(result);
}

void StringDataPipeProducer::OnProducerHandleReady(
    MojoResult ready_result,
    const HandleSignalsState& state) {
  bool failed = false;
  size_t size = data_view_.size();
  if (ready_result == MOJO_RESULT_OK) {
    MojoResult write_result =
        WriteDataToProducerHandle(producer_.get(), data_view_.data(), &size);
    if (write_result != MOJO_RESULT_OK)
      failed = true;
  } else {
    failed = true;
  }

  if (failed) {
    watcher_.Cancel();

    // May delete |this|.
    std::move(callback_).Run(MOJO_RESULT_ABORTED);
    return;
  }

  if (size == data_view_.size()) {
    watcher_.Cancel();

    // May delete |this|.
    std::move(callback_).Run(MOJO_RESULT_OK);
    return;
  }

  data_view_ =
      base::StringPiece(data_view_.data() + size, data_view_.size() - size);
}

}  // namespace mojo