// Copyright 2014 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "device/serial/data_sink_receiver.h" #include <limits> #include "base/bind.h" #include "device/serial/async_waiter.h" namespace device { // Represents a flush of data that has not been completed. class DataSinkReceiver::PendingFlush { public: PendingFlush(); // Initializes this PendingFlush with |num_bytes|, the number of bytes to // flush. void SetNumBytesToFlush(uint32_t num_bytes); // Attempts to discard |bytes_to_flush_| bytes from |handle|. Returns // MOJO_RESULT_OK on success, MOJO_RESULT_SHOULD_WAIT if fewer than // |bytes_to_flush_| bytes were flushed or the error if one is encountered // discarding data from |handle|. MojoResult Flush(mojo::DataPipeConsumerHandle handle); // Whether this PendingFlush has received the number of bytes to flush. bool received_flush() { return received_flush_; } private: // Whether this PendingFlush has received the number of bytes to flush. bool received_flush_; // The remaining number of bytes to flush. uint32_t bytes_to_flush_; }; // A ReadOnlyBuffer implementation that provides a view of a data pipe owned by // a DataSinkReceiver. class DataSinkReceiver::Buffer : public ReadOnlyBuffer { public: Buffer(scoped_refptr<DataSinkReceiver> receiver, const char* buffer, uint32_t buffer_size); virtual ~Buffer(); void Cancel(int32_t error); // ReadOnlyBuffer overrides. virtual const char* GetData() OVERRIDE; virtual uint32_t GetSize() OVERRIDE; virtual void Done(uint32_t bytes_read) OVERRIDE; virtual void DoneWithError(uint32_t bytes_read, int32_t error) OVERRIDE; private: // The DataSinkReceiver whose data pipe we are providing a view. scoped_refptr<DataSinkReceiver> receiver_; const char* buffer_; uint32_t buffer_size_; // Whether this receive has been cancelled. bool cancelled_; // If |cancelled_|, contains the cancellation error to report. int32_t cancellation_error_; }; DataSinkReceiver::DataSinkReceiver(const ReadyCallback& ready_callback, const CancelCallback& cancel_callback, const ErrorCallback& error_callback) : ready_callback_(ready_callback), cancel_callback_(cancel_callback), error_callback_(error_callback), buffer_in_use_(NULL), shut_down_(false), weak_factory_(this) { } void DataSinkReceiver::ShutDown() { shut_down_ = true; if (waiter_) waiter_.reset(); } DataSinkReceiver::~DataSinkReceiver() { } void DataSinkReceiver::Init(mojo::ScopedDataPipeConsumerHandle handle) { if (handle_.is_valid()) { DispatchFatalError(); return; } handle_ = handle.Pass(); StartWaiting(); } void DataSinkReceiver::Cancel(int32_t error) { // If we have sent a ReportBytesSentAndError but have not received the // response, that ReportBytesSentAndError message will appear to the // DataSinkClient to be caused by this Cancel message. In that case, we ignore // the cancel. if (!pending_flushes_.empty() && !pending_flushes_.back()->received_flush()) return; // If there is a buffer is in use, mark the buffer as cancelled and notify the // client by calling |cancel_callback_|. The sink implementation may or may // not take the cancellation into account when deciding what error (if any) to // return. If the sink returns an error, we ignore the cancellation error. // Otherwise, if the sink does not report an error, we override that with the // cancellation error. Once a cancellation has been received, the next report // sent to the client will always contain an error; the error returned by the // sink or the cancellation error if the sink does not return an error. if (buffer_in_use_) { buffer_in_use_->Cancel(error); if (!cancel_callback_.is_null()) cancel_callback_.Run(error); return; } // If there is no buffer in use, immediately report the error and cancel the // waiting for the data pipe if one exists. This transitions straight into the // state after the sink has returned an error. waiter_.reset(); ReportBytesSentAndError(0, error); } void DataSinkReceiver::OnConnectionError() { DispatchFatalError(); } void DataSinkReceiver::StartWaiting() { DCHECK(!waiter_ && !shut_down_); waiter_.reset( new AsyncWaiter(handle_.get(), MOJO_HANDLE_SIGNAL_READABLE, base::Bind(&DataSinkReceiver::OnDoneWaiting, this))); } void DataSinkReceiver::OnDoneWaiting(MojoResult result) { DCHECK(waiter_ && !shut_down_); waiter_.reset(); if (result != MOJO_RESULT_OK) { DispatchFatalError(); return; } // If there are any queued flushes (from ReportBytesSentAndError()), let them // flush data from the data pipe. if (!pending_flushes_.empty()) { MojoResult result = pending_flushes_.front()->Flush(handle_.get()); if (result == MOJO_RESULT_OK) { pending_flushes_.pop(); } else if (result != MOJO_RESULT_SHOULD_WAIT) { DispatchFatalError(); return; } StartWaiting(); return; } const void* data = NULL; uint32_t num_bytes = std::numeric_limits<uint32_t>::max(); result = mojo::BeginReadDataRaw( handle_.get(), &data, &num_bytes, MOJO_READ_DATA_FLAG_NONE); if (result != MOJO_RESULT_OK) { DispatchFatalError(); return; } buffer_in_use_ = new Buffer(this, static_cast<const char*>(data), num_bytes); ready_callback_.Run(scoped_ptr<ReadOnlyBuffer>(buffer_in_use_)); } void DataSinkReceiver::Done(uint32_t bytes_read) { if (!DoneInternal(bytes_read)) return; client()->ReportBytesSent(bytes_read); StartWaiting(); } void DataSinkReceiver::DoneWithError(uint32_t bytes_read, int32_t error) { if (!DoneInternal(bytes_read)) return; ReportBytesSentAndError(bytes_read, error); } bool DataSinkReceiver::DoneInternal(uint32_t bytes_read) { if (shut_down_) return false; DCHECK(buffer_in_use_); buffer_in_use_ = NULL; MojoResult result = mojo::EndReadDataRaw(handle_.get(), bytes_read); if (result != MOJO_RESULT_OK) { DispatchFatalError(); return false; } return true; } void DataSinkReceiver::ReportBytesSentAndError(uint32_t bytes_read, int32_t error) { // When we encounter an error, we must discard the data from any sends already // in the data pipe before we can resume dispatching data to the sink. We add // a pending flush here. The response containing the number of bytes to flush // is handled in SetNumBytesToFlush(). The actual flush is handled in // OnDoneWaiting(). pending_flushes_.push(linked_ptr<PendingFlush>(new PendingFlush())); client()->ReportBytesSentAndError( bytes_read, error, base::Bind(&DataSinkReceiver::SetNumBytesToFlush, weak_factory_.GetWeakPtr())); } void DataSinkReceiver::SetNumBytesToFlush(uint32_t bytes_to_flush) { DCHECK(!pending_flushes_.empty()); DCHECK(!pending_flushes_.back()->received_flush()); pending_flushes_.back()->SetNumBytesToFlush(bytes_to_flush); if (!waiter_) StartWaiting(); } void DataSinkReceiver::DispatchFatalError() { if (shut_down_) return; ShutDown(); if (!error_callback_.is_null()) error_callback_.Run(); } DataSinkReceiver::Buffer::Buffer(scoped_refptr<DataSinkReceiver> receiver, const char* buffer, uint32_t buffer_size) : receiver_(receiver), buffer_(buffer), buffer_size_(buffer_size), cancelled_(false), cancellation_error_(0) { } DataSinkReceiver::Buffer::~Buffer() { if (!receiver_.get()) return; if (cancelled_) receiver_->DoneWithError(0, cancellation_error_); else receiver_->Done(0); } void DataSinkReceiver::Buffer::Cancel(int32_t error) { cancelled_ = true; cancellation_error_ = error; } const char* DataSinkReceiver::Buffer::GetData() { return buffer_; } uint32_t DataSinkReceiver::Buffer::GetSize() { return buffer_size_; } void DataSinkReceiver::Buffer::Done(uint32_t bytes_read) { if (cancelled_) receiver_->DoneWithError(bytes_read, cancellation_error_); else receiver_->Done(bytes_read); receiver_ = NULL; buffer_ = NULL; buffer_size_ = 0; } void DataSinkReceiver::Buffer::DoneWithError(uint32_t bytes_read, int32_t error) { receiver_->DoneWithError(bytes_read, error); receiver_ = NULL; buffer_ = NULL; buffer_size_ = 0; } DataSinkReceiver::PendingFlush::PendingFlush() : received_flush_(false), bytes_to_flush_(0) { } void DataSinkReceiver::PendingFlush::SetNumBytesToFlush(uint32_t num_bytes) { DCHECK(!received_flush_); received_flush_ = true; bytes_to_flush_ = num_bytes; } MojoResult DataSinkReceiver::PendingFlush::Flush( mojo::DataPipeConsumerHandle handle) { DCHECK(received_flush_); uint32_t num_bytes = bytes_to_flush_; MojoResult result = mojo::ReadDataRaw(handle, NULL, &num_bytes, MOJO_READ_DATA_FLAG_DISCARD); if (result != MOJO_RESULT_OK) return result; DCHECK(num_bytes <= bytes_to_flush_); bytes_to_flush_ -= num_bytes; return bytes_to_flush_ == 0 ? MOJO_RESULT_OK : MOJO_RESULT_SHOULD_WAIT; } } // namespace device