// Copyright (c) 2006-2008 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/disk_cache/file.h"
#include <fcntl.h>
#include <set>
#include "base/logging.h"
#include "base/message_loop.h"
#include "base/singleton.h"
#include "base/waitable_event.h"
#include "base/worker_pool.h"
#include "net/disk_cache/disk_cache.h"
namespace {
class InFlightIO;
// This class represents a single asynchronous IO operation while it is being
// bounced between threads.
class BackgroundIO : public base::RefCountedThreadSafe<BackgroundIO> {
public:
// Other than the actual parameters for the IO operation (including the
// |callback| that must be notified at the end), we need the controller that
// is keeping track of all operations. When done, we notify the controller
// (we do NOT invoke the callback), in the worker thead that completed the
// operation.
BackgroundIO(disk_cache::File* file, const void* buf, size_t buf_len,
size_t offset, disk_cache::FileIOCallback* callback,
InFlightIO* controller)
: io_completed_(true, false), callback_(callback), file_(file), buf_(buf),
buf_len_(buf_len), offset_(offset), controller_(controller),
bytes_(0) {}
// Read and Write are the operations that can be performed asynchronously.
// The actual parameters for the operation are setup in the constructor of
// the object, with the exception of |delete_buffer|, that allows a write
// without a callback. Both methods should be called from a worker thread, by
// posting a task to the WorkerPool (they are RunnableMethods). When finished,
// controller->OnIOComplete() is called.
void Read();
void Write(bool delete_buffer);
// This method signals the controller that this operation is finished, in the
// original thread (presumably the IO-Thread). In practice, this is a
// RunableMethod that allows cancellation.
void OnIOSignalled();
// Allows the cancellation of the task to notify the controller (step number 7
// in the diagram below). In practice, if the controller waits for the
// operation to finish it doesn't have to wait for the final task to be
// processed by the message loop so calling this method prevents its delivery.
// Note that this method is not intended to cancel the actual IO operation or
// to prevent the first notification to take place (OnIOComplete).
void Cancel();
// Retrieves the number of bytes transfered.
int Result();
base::WaitableEvent* io_completed() {
return &io_completed_;
}
disk_cache::FileIOCallback* callback() {
return callback_;
}
disk_cache::File* file() {
return file_;
}
private:
friend class base::RefCountedThreadSafe<BackgroundIO>;
~BackgroundIO() {}
// An event to signal when the operation completes, and the user callback that
// has to be invoked. These members are accessed directly by the controller.
base::WaitableEvent io_completed_;
disk_cache::FileIOCallback* callback_;
disk_cache::File* file_;
const void* buf_;
size_t buf_len_;
size_t offset_;
InFlightIO* controller_; // The controller that tracks all operations.
int bytes_; // Final operation result.
DISALLOW_COPY_AND_ASSIGN(BackgroundIO);
};
// This class keeps track of every asynchronous IO operation. A single instance
// of this class is meant to be used to start an asynchronous operation (using
// PostRead/PostWrite). This class will post the operation to a worker thread,
// hanlde the notification when the operation finishes and perform the callback
// on the same thread that was used to start the operation.
//
// The regular sequence of calls is:
// Thread_1 Worker_thread
// 1. InFlightIO::PostRead()
// 2. -> PostTask ->
// 3. BackgroundIO::Read()
// 4. IO operation completes
// 5. InFlightIO::OnIOComplete()
// 6. <- PostTask <-
// 7. BackgroundIO::OnIOSignalled()
// 8. InFlightIO::InvokeCallback()
// 9. invoke callback
//
// Shutdown is a special case that is handled though WaitForPendingIO() instead
// of just waiting for step 7.
class InFlightIO {
public:
InFlightIO() : callback_thread_(MessageLoop::current()) {}
~InFlightIO() {}
// These methods start an asynchronous operation. The arguments have the same
// semantics of the File asynchronous operations, with the exception that the
// operation never finishes synchronously.
void PostRead(disk_cache::File* file, void* buf, size_t buf_len,
size_t offset, disk_cache::FileIOCallback* callback);
void PostWrite(disk_cache::File* file, const void* buf, size_t buf_len,
size_t offset, disk_cache::FileIOCallback* callback,
bool delete_buffer);
// Blocks the current thread until all IO operations tracked by this object
// complete.
void WaitForPendingIO();
// Called on a worker thread when |operation| completes.
void OnIOComplete(BackgroundIO* operation);
// Invokes the users' completion callback at the end of the IO operation.
// |cancel_task| is true if the actual task posted to the thread is still
// queued (because we are inside WaitForPendingIO), and false if said task is
// the one performing the call.
void InvokeCallback(BackgroundIO* operation, bool cancel_task);
private:
typedef std::set<scoped_refptr<BackgroundIO> > IOList;
IOList io_list_; // List of pending io operations.
MessageLoop* callback_thread_;
};
// ---------------------------------------------------------------------------
// Runs on a worker thread.
void BackgroundIO::Read() {
if (file_->Read(const_cast<void*>(buf_), buf_len_, offset_)) {
bytes_ = static_cast<int>(buf_len_);
} else {
bytes_ = -1;
}
controller_->OnIOComplete(this);
}
int BackgroundIO::Result() {
return bytes_;
}
void BackgroundIO::Cancel() {
DCHECK(controller_);
controller_ = NULL;
}
// Runs on a worker thread.
void BackgroundIO::Write(bool delete_buffer) {
bool rv = file_->Write(buf_, buf_len_, offset_);
if (delete_buffer) {
// TODO(rvargas): remove or update this code.
delete[] reinterpret_cast<const char*>(buf_);
}
bytes_ = rv ? static_cast<int>(buf_len_) : -1;
controller_->OnIOComplete(this);
}
// Runs on the IO thread.
void BackgroundIO::OnIOSignalled() {
if (controller_)
controller_->InvokeCallback(this, false);
}
// ---------------------------------------------------------------------------
void InFlightIO::PostRead(disk_cache::File *file, void* buf, size_t buf_len,
size_t offset, disk_cache::FileIOCallback *callback) {
scoped_refptr<BackgroundIO> operation =
new BackgroundIO(file, buf, buf_len, offset, callback, this);
io_list_.insert(operation.get());
file->AddRef(); // Balanced on InvokeCallback()
WorkerPool::PostTask(FROM_HERE,
NewRunnableMethod(operation.get(), &BackgroundIO::Read),
true);
}
void InFlightIO::PostWrite(disk_cache::File* file, const void* buf,
size_t buf_len, size_t offset,
disk_cache::FileIOCallback* callback,
bool delete_buffer) {
scoped_refptr<BackgroundIO> operation =
new BackgroundIO(file, buf, buf_len, offset, callback, this);
io_list_.insert(operation.get());
file->AddRef(); // Balanced on InvokeCallback()
WorkerPool::PostTask(FROM_HERE,
NewRunnableMethod(operation.get(), &BackgroundIO::Write,
delete_buffer),
true);
}
void InFlightIO::WaitForPendingIO() {
while (!io_list_.empty()) {
// Block the current thread until all pending IO completes.
IOList::iterator it = io_list_.begin();
InvokeCallback(*it, true);
}
}
// Runs on a worker thread.
void InFlightIO::OnIOComplete(BackgroundIO* operation) {
callback_thread_->PostTask(FROM_HERE,
NewRunnableMethod(operation,
&BackgroundIO::OnIOSignalled));
operation->io_completed()->Signal();
}
// Runs on the IO thread.
void InFlightIO::InvokeCallback(BackgroundIO* operation, bool cancel_task) {
operation->io_completed()->Wait();
if (cancel_task)
operation->Cancel();
disk_cache::FileIOCallback* callback = operation->callback();
int bytes = operation->Result();
// Release the references acquired in PostRead / PostWrite.
operation->file()->Release();
io_list_.erase(operation);
callback->OnFileIOComplete(bytes);
}
} // namespace
namespace disk_cache {
File::File(base::PlatformFile file)
: init_(true), mixed_(true), platform_file_(file) {
}
bool File::Init(const FilePath& name) {
if (init_)
return false;
int flags = base::PLATFORM_FILE_OPEN |
base::PLATFORM_FILE_READ |
base::PLATFORM_FILE_WRITE;
platform_file_ = base::CreatePlatformFile(name, flags, NULL);
if (platform_file_ < 0) {
platform_file_ = 0;
return false;
}
init_ = true;
return true;
}
File::~File() {
if (platform_file_)
close(platform_file_);
}
base::PlatformFile File::platform_file() const {
return platform_file_;
}
bool File::IsValid() const {
if (!init_)
return false;
return (base::kInvalidPlatformFileValue != platform_file_);
}
bool File::Read(void* buffer, size_t buffer_len, size_t offset) {
DCHECK(init_);
if (buffer_len > ULONG_MAX || offset > LONG_MAX)
return false;
int ret = pread(platform_file_, buffer, buffer_len, offset);
return (static_cast<size_t>(ret) == buffer_len);
}
bool File::Write(const void* buffer, size_t buffer_len, size_t offset) {
DCHECK(init_);
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
int ret = pwrite(platform_file_, buffer, buffer_len, offset);
return (static_cast<size_t>(ret) == buffer_len);
}
// We have to increase the ref counter of the file before performing the IO to
// prevent the completion to happen with an invalid handle (if the file is
// closed while the IO is in flight).
bool File::Read(void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
if (!callback) {
if (completed)
*completed = true;
return Read(buffer, buffer_len, offset);
}
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
InFlightIO* io_operations = Singleton<InFlightIO>::get();
io_operations->PostRead(this, buffer, buffer_len, offset, callback);
*completed = false;
return true;
}
bool File::Write(const void* buffer, size_t buffer_len, size_t offset,
FileIOCallback* callback, bool* completed) {
DCHECK(init_);
if (!callback) {
if (completed)
*completed = true;
return Write(buffer, buffer_len, offset);
}
return AsyncWrite(buffer, buffer_len, offset, true, callback, completed);
}
bool File::PostWrite(const void* buffer, size_t buffer_len, size_t offset) {
DCHECK(init_);
return AsyncWrite(buffer, buffer_len, offset, false, NULL, NULL);
}
bool File::AsyncWrite(const void* buffer, size_t buffer_len, size_t offset,
bool notify, FileIOCallback* callback, bool* completed) {
DCHECK(init_);
if (buffer_len > ULONG_MAX || offset > ULONG_MAX)
return false;
InFlightIO* io_operations = Singleton<InFlightIO>::get();
io_operations->PostWrite(this, buffer, buffer_len, offset, callback, !notify);
if (completed)
*completed = false;
return true;
}
bool File::SetLength(size_t length) {
DCHECK(init_);
if (length > ULONG_MAX)
return false;
return 0 == ftruncate(platform_file_, length);
}
size_t File::GetLength() {
DCHECK(init_);
size_t ret = lseek(platform_file_, 0, SEEK_END);
return ret;
}
// Static.
void File::WaitForPendingIO(int* num_pending_io) {
if (*num_pending_io)
Singleton<InFlightIO>::get()->WaitForPendingIO();
}
} // namespace disk_cache