// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "mojo/public/cpp/system/string_data_pipe_producer.h"
#include <algorithm>
#include "base/bind.h"
#include "base/callback.h"
#include "base/location.h"
#include "base/task_scheduler/post_task.h"
namespace mojo {
namespace {
// Attempts to write data to a producer handle. Outputs the actual number of
// bytes written in |*size|, and returns a result indicating the status of the
// last attempted write operation.
MojoResult WriteDataToProducerHandle(DataPipeProducerHandle producer,
const char* data,
size_t* size) {
void* dest;
uint32_t bytes_left = static_cast<uint32_t>(*size);
// We loop here since the pipe's available capacity may be larger than its
// *contiguous* capacity, and hence two independent consecutive two-phase
// writes may succeed. The goal here is to write as much of |data| as possible
// until we either run out of data or run out of capacity.
MojoResult result;
do {
uint32_t capacity = bytes_left;
result =
producer.BeginWriteData(&dest, &capacity, MOJO_WRITE_DATA_FLAG_NONE);
if (result == MOJO_RESULT_SHOULD_WAIT) {
result = MOJO_RESULT_OK;
break;
} else if (result != MOJO_RESULT_OK) {
break;
}
capacity = std::min(capacity, bytes_left);
memcpy(dest, data, capacity);
MojoResult end_result = producer.EndWriteData(capacity);
DCHECK_EQ(MOJO_RESULT_OK, end_result);
data += capacity;
bytes_left -= capacity;
} while (bytes_left);
*size -= bytes_left;
return result;
}
} // namespace
StringDataPipeProducer::StringDataPipeProducer(
ScopedDataPipeProducerHandle producer)
: producer_(std::move(producer)),
watcher_(FROM_HERE,
SimpleWatcher::ArmingPolicy::AUTOMATIC,
base::SequencedTaskRunnerHandle::Get()),
weak_factory_(this) {}
StringDataPipeProducer::~StringDataPipeProducer() = default;
void StringDataPipeProducer::Write(const base::StringPiece& data,
AsyncWritingMode mode,
CompletionCallback callback) {
DCHECK(!callback_);
callback_ = std::move(callback);
// Immediately attempt to write data without making an extra copy. If we can
// get it all in one shot, we're done aleady.
size_t size = data.size();
MojoResult result =
WriteDataToProducerHandle(producer_.get(), data.data(), &size);
if (result == MOJO_RESULT_OK && size == data.size()) {
base::SequencedTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&StringDataPipeProducer::InvokeCallback,
weak_factory_.GetWeakPtr(), MOJO_RESULT_OK));
} else {
// Copy whatever data didn't make the cut and try again when the pipe has
// some more capacity.
if (mode == AsyncWritingMode::STRING_MAY_BE_INVALIDATED_BEFORE_COMPLETION) {
data_ = std::string(data.data() + size, data.size() - size);
data_view_ = data_;
} else {
data_view_ = base::StringPiece(data.data() + size, data.size() - size);
}
watcher_.Watch(producer_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
MOJO_WATCH_CONDITION_SATISFIED,
base::Bind(&StringDataPipeProducer::OnProducerHandleReady,
base::Unretained(this)));
}
}
void StringDataPipeProducer::InvokeCallback(MojoResult result) {
// May delete |this|.
std::move(callback_).Run(result);
}
void StringDataPipeProducer::OnProducerHandleReady(
MojoResult ready_result,
const HandleSignalsState& state) {
bool failed = false;
size_t size = data_view_.size();
if (ready_result == MOJO_RESULT_OK) {
MojoResult write_result =
WriteDataToProducerHandle(producer_.get(), data_view_.data(), &size);
if (write_result != MOJO_RESULT_OK)
failed = true;
} else {
failed = true;
}
if (failed) {
watcher_.Cancel();
// May delete |this|.
std::move(callback_).Run(MOJO_RESULT_ABORTED);
return;
}
if (size == data_view_.size()) {
watcher_.Cancel();
// May delete |this|.
std::move(callback_).Run(MOJO_RESULT_OK);
return;
}
data_view_ =
base::StringPiece(data_view_.data() + size, data_view_.size() - size);
}
} // namespace mojo