普通文本  |  290行  |  9.79 KB

// Copyright 2017 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "mojo/public/cpp/system/file_data_pipe_producer.h"

#include <algorithm>
#include <limits>
#include <memory>
#include <utility>

#include "base/bind.h"
#include "base/callback.h"
#include "base/location.h"
#include "base/memory/ref_counted_delete_on_sequence.h"
#include "base/numerics/safe_conversions.h"
#include "base/sequenced_task_runner.h"
#include "base/synchronization/lock.h"
#include "base/task_scheduler/post_task.h"
#include "base/threading/sequenced_task_runner_handle.h"
#include "mojo/public/cpp/system/simple_watcher.h"

namespace mojo {

namespace {

// No good reason not to attempt very large pipe transactions in case the data
// pipe in use has a very large capacity available, so we default to trying
// 64 MB chunks whenever a producer is writable.
constexpr uint32_t kDefaultMaxReadSize = 64 * 1024 * 1024;

MojoResult FileErrorToMojoResult(base::File::Error error) {
  switch (error) {
    case base::File::FILE_OK:
      return MOJO_RESULT_OK;
    case base::File::FILE_ERROR_NOT_FOUND:
      return MOJO_RESULT_NOT_FOUND;
    case base::File::FILE_ERROR_SECURITY:
    case base::File::FILE_ERROR_ACCESS_DENIED:
      return MOJO_RESULT_PERMISSION_DENIED;
    case base::File::FILE_ERROR_TOO_MANY_OPENED:
    case base::File::FILE_ERROR_NO_MEMORY:
      return MOJO_RESULT_RESOURCE_EXHAUSTED;
    case base::File::FILE_ERROR_ABORT:
      return MOJO_RESULT_ABORTED;
    default:
      return MOJO_RESULT_UNKNOWN;
  }
}

}  // namespace

class FileDataPipeProducer::FileSequenceState
    : public base::RefCountedDeleteOnSequence<FileSequenceState> {
 public:
  using CompletionCallback =
      base::OnceCallback<void(ScopedDataPipeProducerHandle producer,
                              MojoResult result)>;

  FileSequenceState(
      ScopedDataPipeProducerHandle producer_handle,
      scoped_refptr<base::SequencedTaskRunner> file_task_runner,
      CompletionCallback callback,
      scoped_refptr<base::SequencedTaskRunner> callback_task_runner,
      std::unique_ptr<Observer> observer)
      : base::RefCountedDeleteOnSequence<FileSequenceState>(
            std::move(file_task_runner)),
        callback_task_runner_(std::move(callback_task_runner)),
        producer_handle_(std::move(producer_handle)),
        callback_(std::move(callback)),
        observer_(std::move(observer)) {}

  void Cancel() {
    base::AutoLock lock(lock_);
    is_cancelled_ = true;
  }

  void StartFromFile(base::File file, size_t max_bytes) {
    owning_task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&FileSequenceState::StartFromFileOnFileSequence, this,
                       std::move(file), max_bytes));
  }

  void StartFromPath(const base::FilePath& path) {
    owning_task_runner()->PostTask(
        FROM_HERE,
        base::BindOnce(&FileSequenceState::StartFromPathOnFileSequence, this,
                       path));
  }

 private:
  friend class base::DeleteHelper<FileSequenceState>;
  friend class base::RefCountedDeleteOnSequence<FileSequenceState>;

  ~FileSequenceState() = default;

  void StartFromFileOnFileSequence(base::File file, size_t max_bytes) {
    if (file.error_details() != base::File::FILE_OK) {
      Finish(FileErrorToMojoResult(file.error_details()));
      return;
    }
    file_ = std::move(file);
    max_bytes_ = max_bytes;
    TransferSomeBytes();
    if (producer_handle_.is_valid()) {
      // If we didn't nail it all on the first transaction attempt, setup a
      // watcher and complete the read asynchronously.
      watcher_ = std::make_unique<SimpleWatcher>(
          FROM_HERE, SimpleWatcher::ArmingPolicy::AUTOMATIC,
          base::SequencedTaskRunnerHandle::Get());
      watcher_->Watch(producer_handle_.get(), MOJO_HANDLE_SIGNAL_WRITABLE,
                      MOJO_WATCH_CONDITION_SATISFIED,
                      base::Bind(&FileSequenceState::OnHandleReady, this));
    }
  }

  void StartFromPathOnFileSequence(const base::FilePath& path) {
    StartFromFileOnFileSequence(
        base::File(path, base::File::FLAG_OPEN | base::File::FLAG_READ),
        std::numeric_limits<size_t>::max());
  }

  void OnHandleReady(MojoResult result, const HandleSignalsState& state) {
    {
      // Stop ourselves from doing redundant work if we've been cancelled from
      // another thread. Note that we do not rely on this for any kind of thread
      // safety concerns.
      base::AutoLock lock(lock_);
      if (is_cancelled_)
        return;
    }

    if (result != MOJO_RESULT_OK) {
      // Either the consumer pipe has been closed or something terrible
      // happened. In any case, we'll never be able to write more data.
      Finish(result);
      return;
    }

    TransferSomeBytes();
  }

  void TransferSomeBytes() {
    while (true) {
      // Lock as much of the pipe as we can.
      void* pipe_buffer;
      uint32_t size = kDefaultMaxReadSize;
      MojoResult result = producer_handle_->BeginWriteData(
          &pipe_buffer, &size, MOJO_WRITE_DATA_FLAG_NONE);
      if (result == MOJO_RESULT_SHOULD_WAIT)
        return;
      if (result != MOJO_RESULT_OK) {
        Finish(result);
        return;
      }

      // Attempt to read that many bytes from the file, directly into the data
      // pipe. Note that while |max_bytes_remaining| may be very large, the
      // length we attempt read is bounded by the much smaller
      // |kDefaultMaxReadSize| via |size|.
      DCHECK(base::IsValueInRangeForNumericType<int>(size));
      const size_t max_bytes_remaining = max_bytes_ - bytes_transferred_;
      int attempted_read_size = static_cast<int>(
          std::min(static_cast<size_t>(size), max_bytes_remaining));
      int read_size = file_.ReadAtCurrentPos(static_cast<char*>(pipe_buffer),
                                             attempted_read_size);
      base::File::Error read_error;
      if (read_size < 0) {
        read_error = base::File::GetLastFileError();
        DCHECK_NE(base::File::FILE_OK, read_error);
        if (observer_)
          observer_->OnBytesRead(pipe_buffer, 0u, read_error);
      } else {
        read_error = base::File::FILE_OK;
        if (observer_) {
          observer_->OnBytesRead(pipe_buffer, static_cast<size_t>(read_size),
                                 base::File::FILE_OK);
        }
      }
      producer_handle_->EndWriteData(
          read_size >= 0 ? static_cast<uint32_t>(read_size) : 0);

      if (read_size < 0) {
        Finish(FileErrorToMojoResult(read_error));
        return;
      }

      bytes_transferred_ += read_size;
      DCHECK_LE(bytes_transferred_, max_bytes_);

      if (read_size < attempted_read_size) {
        // ReadAtCurrentPos makes a best effort to read all requested bytes. We
        // reasonably assume if it fails to read what we ask for, we've hit EOF.
        Finish(MOJO_RESULT_OK);
        return;
      }

      if (bytes_transferred_ == max_bytes_) {
        // We've read as much as we were asked to read.
        Finish(MOJO_RESULT_OK);
        return;
      }
    }
  }

  void Finish(MojoResult result) {
    if (observer_) {
      observer_->OnDoneReading();
      observer_ = nullptr;
    }
    watcher_.reset();
    callback_task_runner_->PostTask(
        FROM_HERE, base::BindOnce(std::move(callback_),
                                  std::move(producer_handle_), result));
  }

  const scoped_refptr<base::SequencedTaskRunner> callback_task_runner_;

  // State which is effectively owned and used only on the file sequence.
  ScopedDataPipeProducerHandle producer_handle_;
  base::File file_;
  size_t max_bytes_ = 0;
  size_t bytes_transferred_ = 0;
  CompletionCallback callback_;
  std::unique_ptr<SimpleWatcher> watcher_;

  // Guards |is_cancelled_|.
  base::Lock lock_;
  bool is_cancelled_ = false;
  std::unique_ptr<Observer> observer_;

  DISALLOW_COPY_AND_ASSIGN(FileSequenceState);
};

FileDataPipeProducer::FileDataPipeProducer(
    ScopedDataPipeProducerHandle producer,
    std::unique_ptr<Observer> observer)
    : producer_(std::move(producer)),
      observer_(std::move(observer)),
      weak_factory_(this) {}

FileDataPipeProducer::~FileDataPipeProducer() {
  if (file_sequence_state_)
    file_sequence_state_->Cancel();
}

void FileDataPipeProducer::WriteFromFile(base::File file,
                                         CompletionCallback callback) {
  WriteFromFile(std::move(file), std::numeric_limits<size_t>::max(),
                std::move(callback));
}

void FileDataPipeProducer::WriteFromFile(base::File file,
                                         size_t max_bytes,
                                         CompletionCallback callback) {
  InitializeNewRequest(std::move(callback));
  file_sequence_state_->StartFromFile(std::move(file), max_bytes);
}

void FileDataPipeProducer::WriteFromPath(const base::FilePath& path,
                                         CompletionCallback callback) {
  InitializeNewRequest(std::move(callback));
  file_sequence_state_->StartFromPath(path);
}

void FileDataPipeProducer::InitializeNewRequest(CompletionCallback callback) {
  DCHECK(!file_sequence_state_);

  LOG(FATAL) << "unsupported in libchrome";
  // auto file_task_runner = base::CreateSequencedTaskRunnerWithTraits(
  //     {base::MayBlock(), base::TaskPriority::BACKGROUND});
  // file_sequence_state_ = new FileSequenceState(
  //     std::move(producer_), file_task_runner,
  //     base::BindOnce(&FileDataPipeProducer::OnWriteComplete,
  //                    weak_factory_.GetWeakPtr(), std::move(callback)),
  //     base::SequencedTaskRunnerHandle::Get(), std::move(observer_));
}

void FileDataPipeProducer::OnWriteComplete(
    CompletionCallback callback,
    ScopedDataPipeProducerHandle producer,
    MojoResult ready_result) {
  producer_ = std::move(producer);
  file_sequence_state_ = nullptr;
  std::move(callback).Run(ready_result);
}

}  // namespace mojo