// Copyright 2014 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "device/serial/data_sender.h"
#include "base/bind.h"
#include "base/message_loop/message_loop.h"
#include "device/serial/async_waiter.h"
namespace device {
// Represents a send that is not yet fulfilled.
class DataSender::PendingSend {
public:
PendingSend(const base::StringPiece& data,
const DataSentCallback& callback,
const SendErrorCallback& error_callback,
int32_t fatal_error_value);
// Invoked to report that |num_bytes| of data have been sent. Subtracts the
// number of bytes that were part of this send from |num_bytes|. Returns
// whether this send has been completed. If this send has been completed, this
// calls |callback_|.
bool ReportBytesSent(uint32_t* num_bytes);
// Invoked to report that |num_bytes| of data have been sent and then an
// error, |error| was encountered. Subtracts the number of bytes that were
// part of this send from |num_bytes|. If this send was not completed before
// the error, this calls |error_callback_| to report the error. Otherwise,
// this calls |callback_|. Returns the number of bytes sent but not acked.
uint32_t ReportBytesSentAndError(uint32_t* num_bytes, int32_t error);
// Reports |fatal_error_value_| to |receive_error_callback_|.
void DispatchFatalError();
// Attempts to send any data not yet sent to |handle|. Returns MOJO_RESULT_OK
// if all data is sent, MOJO_RESULT_SHOULD_WAIT if not all of the data is sent
// or the error if one is encountered writing to |handle|.
MojoResult SendData(mojo::DataPipeProducerHandle handle);
private:
// Invoked to update |bytes_acked_| and |num_bytes|.
void ReportBytesSentInternal(uint32_t* num_bytes);
// The data to send.
const base::StringPiece data_;
// The callback to report success.
const DataSentCallback callback_;
// The callback to report errors.
const SendErrorCallback error_callback_;
// The error value to report when DispatchFatalError() is called.
const int32_t fatal_error_value_;
// The number of bytes sent to the data pipe.
uint32_t bytes_sent_;
// The number of bytes acked.
uint32_t bytes_acked_;
};
DataSender::DataSender(mojo::InterfacePtr<serial::DataSink> sink,
uint32_t buffer_size,
int32_t fatal_error_value)
: sink_(sink.Pass()),
fatal_error_value_(fatal_error_value),
shut_down_(false) {
sink_.set_error_handler(this);
MojoCreateDataPipeOptions options = {
sizeof(options), MOJO_CREATE_DATA_PIPE_OPTIONS_FLAG_NONE, 1, buffer_size,
};
options.struct_size = sizeof(options);
mojo::ScopedDataPipeConsumerHandle remote_handle;
MojoResult result = mojo::CreateDataPipe(&options, &handle_, &remote_handle);
DCHECK_EQ(MOJO_RESULT_OK, result);
sink_->Init(remote_handle.Pass());
sink_.set_client(this);
}
DataSender::~DataSender() {
ShutDown();
}
bool DataSender::Send(const base::StringPiece& data,
const DataSentCallback& callback,
const SendErrorCallback& error_callback) {
DCHECK(!callback.is_null() && !error_callback.is_null());
if (!pending_cancel_.is_null() || shut_down_)
return false;
pending_sends_.push(linked_ptr<PendingSend>(
new PendingSend(data, callback, error_callback, fatal_error_value_)));
SendInternal();
return true;
}
bool DataSender::Cancel(int32_t error, const CancelCallback& callback) {
DCHECK(!callback.is_null());
if (!pending_cancel_.is_null() || shut_down_)
return false;
if (pending_sends_.empty() && sends_awaiting_ack_.empty()) {
base::MessageLoop::current()->PostTask(FROM_HERE, callback);
return true;
}
pending_cancel_ = callback;
sink_->Cancel(error);
return true;
}
void DataSender::ReportBytesSent(uint32_t bytes_sent) {
if (shut_down_)
return;
while (bytes_sent != 0 && !sends_awaiting_ack_.empty() &&
sends_awaiting_ack_.front()->ReportBytesSent(&bytes_sent)) {
sends_awaiting_ack_.pop();
}
if (bytes_sent > 0 && !pending_sends_.empty()) {
bool finished = pending_sends_.front()->ReportBytesSent(&bytes_sent);
DCHECK(!finished);
if (finished) {
ShutDown();
return;
}
}
if (bytes_sent != 0) {
ShutDown();
return;
}
if (pending_sends_.empty() && sends_awaiting_ack_.empty())
RunCancelCallback();
}
void DataSender::ReportBytesSentAndError(
uint32_t bytes_sent,
int32_t error,
const mojo::Callback<void(uint32_t)>& callback) {
if (shut_down_)
return;
uint32_t bytes_to_flush = 0;
while (!sends_awaiting_ack_.empty()) {
bytes_to_flush += sends_awaiting_ack_.front()->ReportBytesSentAndError(
&bytes_sent, error);
sends_awaiting_ack_.pop();
}
while (!pending_sends_.empty()) {
bytes_to_flush +=
pending_sends_.front()->ReportBytesSentAndError(&bytes_sent, error);
pending_sends_.pop();
}
callback.Run(bytes_to_flush);
RunCancelCallback();
}
void DataSender::OnConnectionError() {
ShutDown();
}
void DataSender::SendInternal() {
while (!pending_sends_.empty()) {
MojoResult result = pending_sends_.front()->SendData(handle_.get());
if (result == MOJO_RESULT_OK) {
sends_awaiting_ack_.push(pending_sends_.front());
pending_sends_.pop();
} else if (result == MOJO_RESULT_SHOULD_WAIT) {
waiter_.reset(new AsyncWaiter(
handle_.get(),
MOJO_HANDLE_SIGNAL_WRITABLE,
base::Bind(&DataSender::OnDoneWaiting, base::Unretained(this))));
return;
} else {
ShutDown();
return;
}
}
}
void DataSender::OnDoneWaiting(MojoResult result) {
waiter_.reset();
if (result != MOJO_RESULT_OK) {
ShutDown();
return;
}
SendInternal();
}
void DataSender::RunCancelCallback() {
DCHECK(pending_sends_.empty() && sends_awaiting_ack_.empty());
if (pending_cancel_.is_null())
return;
base::MessageLoop::current()->PostTask(FROM_HERE,
base::Bind(pending_cancel_));
pending_cancel_.Reset();
}
void DataSender::ShutDown() {
waiter_.reset();
shut_down_ = true;
while (!pending_sends_.empty()) {
pending_sends_.front()->DispatchFatalError();
pending_sends_.pop();
}
while (!sends_awaiting_ack_.empty()) {
sends_awaiting_ack_.front()->DispatchFatalError();
sends_awaiting_ack_.pop();
}
RunCancelCallback();
}
DataSender::PendingSend::PendingSend(const base::StringPiece& data,
const DataSentCallback& callback,
const SendErrorCallback& error_callback,
int32_t fatal_error_value)
: data_(data),
callback_(callback),
error_callback_(error_callback),
fatal_error_value_(fatal_error_value),
bytes_sent_(0),
bytes_acked_(0) {
}
bool DataSender::PendingSend::ReportBytesSent(uint32_t* num_bytes) {
ReportBytesSentInternal(num_bytes);
if (bytes_acked_ < data_.size())
return false;
base::MessageLoop::current()->PostTask(FROM_HERE,
base::Bind(callback_, bytes_acked_));
return true;
}
uint32_t DataSender::PendingSend::ReportBytesSentAndError(uint32_t* num_bytes,
int32_t error) {
ReportBytesSentInternal(num_bytes);
if (*num_bytes > 0) {
base::MessageLoop::current()->PostTask(FROM_HERE,
base::Bind(callback_, bytes_acked_));
return 0;
}
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(error_callback_, bytes_acked_, error));
return bytes_sent_ - bytes_acked_;
}
void DataSender::PendingSend::DispatchFatalError() {
base::MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(error_callback_, 0, fatal_error_value_));
}
MojoResult DataSender::PendingSend::SendData(
mojo::DataPipeProducerHandle handle) {
uint32_t bytes_to_send = static_cast<uint32_t>(data_.size()) - bytes_sent_;
MojoResult result = mojo::WriteDataRaw(handle,
data_.data() + bytes_sent_,
&bytes_to_send,
MOJO_WRITE_DATA_FLAG_NONE);
if (result != MOJO_RESULT_OK)
return result;
bytes_sent_ += bytes_to_send;
return bytes_sent_ == data_.size() ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT;
}
void DataSender::PendingSend::ReportBytesSentInternal(uint32_t* num_bytes) {
bytes_acked_ += *num_bytes;
if (bytes_acked_ > bytes_sent_) {
*num_bytes = bytes_acked_ - bytes_sent_;
bytes_acked_ = bytes_sent_;
} else {
*num_bytes = 0;
}
}
} // namespace device