普通文本  |  480行  |  16.47 KB

// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "chrome/browser/chromeos/drive/sync_client.h"

#include <vector>

#include "base/bind.h"
#include "base/message_loop/message_loop_proxy.h"
#include "chrome/browser/chromeos/drive/drive.pb.h"
#include "chrome/browser/chromeos/drive/file_cache.h"
#include "chrome/browser/chromeos/drive/file_system/download_operation.h"
#include "chrome/browser/chromeos/drive/file_system/update_operation.h"
#include "chrome/browser/chromeos/drive/file_system_util.h"
#include "chrome/browser/chromeos/drive/sync/entry_update_performer.h"
#include "content/public/browser/browser_thread.h"
#include "google_apis/drive/task_util.h"

using content::BrowserThread;

namespace drive {
namespace internal {

namespace {

// The delay constant is used to delay processing a sync task. We should not
// process SyncTasks immediately for the following reasons:
//
// 1) For fetching, the user may accidentally click on "Make available
//    offline" checkbox on a file, and immediately cancel it in a second.
//    It's a waste to fetch the file in this case.
//
// 2) For uploading, file writing via HTML5 file system API is performed in
//    two steps: 1) truncate a file to 0 bytes, 2) write contents. We
//    shouldn't start uploading right after the step 1). Besides, the user
//    may edit the same file repeatedly in a short period of time.
//
// TODO(satorux): We should find a way to handle the upload case more nicely,
// and shorten the delay. crbug.com/134774
const int kDelaySeconds = 5;

// The delay constant is used to delay retrying a sync task on server errors.
const int kLongDelaySeconds = 600;

// Iterates entries and appends IDs to |to_fetch| if the file is pinned but not
// fetched (not present locally), to |to_upload| if the file is dirty but not
// uploaded, or to |to_remove| if the entry is in the trash.
void CollectBacklog(ResourceMetadata* metadata,
                    std::vector<std::string>* to_fetch,
                    std::vector<std::string>* to_upload,
                    std::vector<std::string>* to_update) {
  DCHECK(to_fetch);
  DCHECK(to_upload);
  DCHECK(to_update);

  scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator();
  for (; !it->IsAtEnd(); it->Advance()) {
    const std::string& local_id = it->GetID();
    const ResourceEntry& entry = it->GetValue();
    if (entry.parent_local_id() == util::kDriveTrashDirLocalId) {
      to_update->push_back(local_id);
      continue;
    }

    switch (entry.metadata_edit_state()) {
      case ResourceEntry::CLEAN:
        break;
      case ResourceEntry::SYNCING:
      case ResourceEntry::DIRTY:
        to_update->push_back(local_id);
        break;
    }

    FileCacheEntry cache_entry;
    if (it->GetCacheEntry(&cache_entry)) {
      if (cache_entry.is_pinned() && !cache_entry.is_present())
        to_fetch->push_back(local_id);

      if (cache_entry.is_dirty())
        to_upload->push_back(local_id);
    }
  }
  DCHECK(!it->HasError());
}

// Iterates cache entries and collects IDs of ones with obsolete cache files.
void CheckExistingPinnedFiles(ResourceMetadata* metadata,
                              FileCache* cache,
                              std::vector<std::string>* local_ids) {
  scoped_ptr<FileCache::Iterator> it = cache->GetIterator();
  for (; !it->IsAtEnd(); it->Advance()) {
    const FileCacheEntry& cache_entry = it->GetValue();
    const std::string& local_id = it->GetID();
    if (!cache_entry.is_pinned() || !cache_entry.is_present())
      continue;

    ResourceEntry entry;
    FileError error = metadata->GetResourceEntryById(local_id, &entry);
    if (error != FILE_ERROR_OK) {
      LOG(WARNING) << "Entry not found: " << local_id;
      continue;
    }

    // If MD5s don't match, it indicates the local cache file is stale, unless
    // the file is dirty (the MD5 is "local"). We should never re-fetch the
    // file when we have a locally modified version.
    if (entry.file_specific_info().md5() == cache_entry.md5() ||
        cache_entry.is_dirty())
      continue;

    error = cache->Remove(local_id);
    if (error != FILE_ERROR_OK) {
      LOG(WARNING) << "Failed to remove cache entry: " << local_id;
      continue;
    }

    error = cache->Pin(local_id);
    if (error != FILE_ERROR_OK) {
      LOG(WARNING) << "Failed to pin cache entry: " << local_id;
      continue;
    }

    local_ids->push_back(local_id);
  }
  DCHECK(!it->HasError());
}

}  // namespace

SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {}
SyncClient::SyncTask::~SyncTask() {}

SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner,
                       file_system::OperationObserver* observer,
                       JobScheduler* scheduler,
                       ResourceMetadata* metadata,
                       FileCache* cache,
                       const base::FilePath& temporary_file_directory)
    : blocking_task_runner_(blocking_task_runner),
      metadata_(metadata),
      cache_(cache),
      download_operation_(new file_system::DownloadOperation(
          blocking_task_runner,
          observer,
          scheduler,
          metadata,
          cache,
          temporary_file_directory)),
      update_operation_(new file_system::UpdateOperation(blocking_task_runner,
                                                         observer,
                                                         scheduler,
                                                         metadata,
                                                         cache)),
      entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner,
                                                       observer,
                                                       scheduler,
                                                       metadata)),
      delay_(base::TimeDelta::FromSeconds(kDelaySeconds)),
      long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)),
      weak_ptr_factory_(this) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
}

SyncClient::~SyncClient() {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
}

void SyncClient::StartProcessingBacklog() {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  std::vector<std::string>* to_fetch = new std::vector<std::string>;
  std::vector<std::string>* to_upload = new std::vector<std::string>;
  std::vector<std::string>* to_remove = new std::vector<std::string>;
  blocking_task_runner_->PostTaskAndReply(
      FROM_HERE,
      base::Bind(&CollectBacklog, metadata_, to_fetch, to_upload, to_remove),
      base::Bind(&SyncClient::OnGetLocalIdsOfBacklog,
                 weak_ptr_factory_.GetWeakPtr(),
                 base::Owned(to_fetch),
                 base::Owned(to_upload),
                 base::Owned(to_remove)));
}

void SyncClient::StartCheckingExistingPinnedFiles() {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  std::vector<std::string>* local_ids = new std::vector<std::string>;
  blocking_task_runner_->PostTaskAndReply(
      FROM_HERE,
      base::Bind(&CheckExistingPinnedFiles,
                 metadata_,
                 cache_,
                 local_ids),
      base::Bind(&SyncClient::AddFetchTasks,
                 weak_ptr_factory_.GetWeakPtr(),
                 base::Owned(local_ids)));
}

void SyncClient::AddFetchTask(const std::string& local_id) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
  AddFetchTaskInternal(local_id, delay_);
}

void SyncClient::RemoveFetchTask(const std::string& local_id) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id));
  if (it == tasks_.end())
    return;

  SyncTask* task = &it->second;
  switch (task->state) {
    case PENDING:
      tasks_.erase(it);
      break;
    case RUNNING:
      // TODO(kinaba): Cancel tasks in JobScheduler as well. crbug.com/248856
      break;
  }
}

void SyncClient::AddUploadTask(const ClientContext& context,
                               const std::string& local_id) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
  AddUploadTaskInternal(context, local_id,
                        file_system::UpdateOperation::RUN_CONTENT_CHECK,
                        delay_);
}

void SyncClient::AddUpdateTask(const std::string& local_id) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
  AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
}

void SyncClient::AddFetchTaskInternal(const std::string& local_id,
                                      const base::TimeDelta& delay) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  SyncTask task;
  task.task = base::Bind(
      &file_system::DownloadOperation::EnsureFileDownloadedByLocalId,
      base::Unretained(download_operation_.get()),
      local_id,
      ClientContext(BACKGROUND),
      GetFileContentInitializedCallback(),
      google_apis::GetContentCallback(),
      base::Bind(&SyncClient::OnFetchFileComplete,
                 weak_ptr_factory_.GetWeakPtr(),
                 local_id));
  AddTask(SyncTasks::key_type(FETCH, local_id), task, delay);
}

void SyncClient::AddUploadTaskInternal(
    const ClientContext& context,
    const std::string& local_id,
    file_system::UpdateOperation::ContentCheckMode content_check_mode,
    const base::TimeDelta& delay) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  SyncTask task;
  task.task = base::Bind(
      &file_system::UpdateOperation::UpdateFileByLocalId,
      base::Unretained(update_operation_.get()),
      local_id,
      context,
      content_check_mode,
      base::Bind(&SyncClient::OnUploadFileComplete,
                 weak_ptr_factory_.GetWeakPtr(),
                 local_id));
  AddTask(SyncTasks::key_type(UPLOAD, local_id), task, delay);
}

void SyncClient::AddUpdateTaskInternal(const std::string& local_id,
                                       const base::TimeDelta& delay) {
  SyncTask task;
  task.task = base::Bind(
      &EntryUpdatePerformer::UpdateEntry,
      base::Unretained(entry_update_performer_.get()),
      local_id,
      base::Bind(&SyncClient::OnUpdateComplete,
                 weak_ptr_factory_.GetWeakPtr(),
                 local_id));
  AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay);
}

void SyncClient::AddTask(const SyncTasks::key_type& key,
                         const SyncTask& task,
                         const base::TimeDelta& delay) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  SyncTasks::iterator it = tasks_.find(key);
  if (it != tasks_.end()) {
    switch (it->second.state) {
      case PENDING:
        // The same task will run, do nothing.
        break;
      case RUNNING:
        // Something has changed since the task started. Schedule rerun.
        it->second.should_run_again = true;
        break;
    }
    return;
  }

  DCHECK_EQ(PENDING, task.state);
  tasks_[key] = task;

  base::MessageLoopProxy::current()->PostDelayedTask(
      FROM_HERE,
      base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key),
      delay);
}

void SyncClient::StartTask(const SyncTasks::key_type& key) {
  SyncTasks::iterator it = tasks_.find(key);
  if (it == tasks_.end())
    return;

  SyncTask* task = &it->second;
  switch (task->state) {
    case PENDING:
      task->state = RUNNING;
      task->task.Run();
      break;
    case RUNNING:  // Do nothing.
      break;
  }
}

void SyncClient::OnGetLocalIdsOfBacklog(
    const std::vector<std::string>* to_fetch,
    const std::vector<std::string>* to_upload,
    const std::vector<std::string>* to_update) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  // Give priority to upload tasks over fetch tasks, so that dirty files are
  // uploaded as soon as possible.
  for (size_t i = 0; i < to_upload->size(); ++i) {
    const std::string& local_id = (*to_upload)[i];
    DVLOG(1) << "Queuing to upload: " << local_id;
    AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
                          file_system::UpdateOperation::NO_CONTENT_CHECK,
                          delay_);
  }

  for (size_t i = 0; i < to_fetch->size(); ++i) {
    const std::string& local_id = (*to_fetch)[i];
    DVLOG(1) << "Queuing to fetch: " << local_id;
    AddFetchTaskInternal(local_id, delay_);
  }

  for (size_t i = 0; i < to_update->size(); ++i) {
    const std::string& local_id = (*to_update)[i];
    DVLOG(1) << "Queuing to update: " << local_id;
    AddUpdateTask(local_id);
  }
}

void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  for (size_t i = 0; i < local_ids->size(); ++i)
    AddFetchTask((*local_ids)[i]);
}

bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  const SyncTasks::key_type key(type, local_id);
  SyncTasks::iterator it = tasks_.find(key);
  DCHECK(it != tasks_.end());

  if (it->second.should_run_again) {
    DVLOG(1) << "Running again: type = " << type << ", id = " << local_id;
    it->second.should_run_again = false;
    it->second.task.Run();
    return false;
  }

  tasks_.erase(it);
  return true;
}

void SyncClient::OnFetchFileComplete(const std::string& local_id,
                                     FileError error,
                                     const base::FilePath& local_path,
                                     scoped_ptr<ResourceEntry> entry) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  if (!OnTaskComplete(FETCH, local_id))
    return;

  if (error == FILE_ERROR_OK) {
    DVLOG(1) << "Fetched " << local_id << ": " << local_path.value();
  } else {
    switch (error) {
      case FILE_ERROR_ABORT:
        // If user cancels download, unpin the file so that we do not sync the
        // file again.
        base::PostTaskAndReplyWithResult(
            blocking_task_runner_,
            FROM_HERE,
            base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id),
            base::Bind(&util::EmptyFileOperationCallback));
        break;
      case FILE_ERROR_NO_CONNECTION:
        // Add the task again so that we'll retry once the connection is back.
        AddFetchTaskInternal(local_id, delay_);
        break;
      case FILE_ERROR_SERVICE_UNAVAILABLE:
        // Add the task again so that we'll retry once the service is back.
        AddFetchTaskInternal(local_id, long_delay_);
        break;
      default:
        LOG(WARNING) << "Failed to fetch " << local_id
                     << ": " << FileErrorToString(error);
    }
  }
}

void SyncClient::OnUploadFileComplete(const std::string& local_id,
                                      FileError error) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  if (!OnTaskComplete(UPLOAD, local_id))
    return;

  if (error == FILE_ERROR_OK) {
    DVLOG(1) << "Uploaded " << local_id;
  } else {
    switch (error) {
      case FILE_ERROR_NO_CONNECTION:
        // Add the task again so that we'll retry once the connection is back.
        AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
                              file_system::UpdateOperation::NO_CONTENT_CHECK,
                              delay_);
        break;
      case FILE_ERROR_SERVICE_UNAVAILABLE:
        // Add the task again so that we'll retry once the service is back.
        AddUploadTaskInternal(ClientContext(BACKGROUND), local_id,
                              file_system::UpdateOperation::NO_CONTENT_CHECK,
                              long_delay_);
        break;
      default:
        LOG(WARNING) << "Failed to upload " << local_id << ": "
                     << FileErrorToString(error);
    }
  }
}

void SyncClient::OnUpdateComplete(const std::string& local_id,
                                  FileError error) {
  DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));

  if (!OnTaskComplete(UPDATE, local_id))
    return;

  if (error == FILE_ERROR_OK) {
    DVLOG(1) << "Updated " << local_id;
  } else {
    switch (error) {
      case FILE_ERROR_NO_CONNECTION:
        // Add the task again so that we'll retry once the connection is back.
        AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0));
        break;
      case FILE_ERROR_SERVICE_UNAVAILABLE:
        // Add the task again so that we'll retry once the service is back.
        AddUpdateTaskInternal(local_id, long_delay_);
        break;
      default:
        LOG(WARNING) << "Failed to update " << local_id << ": "
                     << FileErrorToString(error);
    }
  }
}

}  // namespace internal
}  // namespace drive