// Copyright (c) 2012 The Chromium Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "chrome/browser/chromeos/drive/sync_client.h" #include <vector> #include "base/bind.h" #include "base/message_loop/message_loop_proxy.h" #include "chrome/browser/chromeos/drive/drive.pb.h" #include "chrome/browser/chromeos/drive/file_cache.h" #include "chrome/browser/chromeos/drive/file_system/download_operation.h" #include "chrome/browser/chromeos/drive/file_system/update_operation.h" #include "chrome/browser/chromeos/drive/file_system_util.h" #include "chrome/browser/chromeos/drive/sync/entry_update_performer.h" #include "content/public/browser/browser_thread.h" #include "google_apis/drive/task_util.h" using content::BrowserThread; namespace drive { namespace internal { namespace { // The delay constant is used to delay processing a sync task. We should not // process SyncTasks immediately for the following reasons: // // 1) For fetching, the user may accidentally click on "Make available // offline" checkbox on a file, and immediately cancel it in a second. // It's a waste to fetch the file in this case. // // 2) For uploading, file writing via HTML5 file system API is performed in // two steps: 1) truncate a file to 0 bytes, 2) write contents. We // shouldn't start uploading right after the step 1). Besides, the user // may edit the same file repeatedly in a short period of time. // // TODO(satorux): We should find a way to handle the upload case more nicely, // and shorten the delay. crbug.com/134774 const int kDelaySeconds = 5; // The delay constant is used to delay retrying a sync task on server errors. const int kLongDelaySeconds = 600; // Iterates entries and appends IDs to |to_fetch| if the file is pinned but not // fetched (not present locally), to |to_upload| if the file is dirty but not // uploaded, or to |to_remove| if the entry is in the trash. void CollectBacklog(ResourceMetadata* metadata, std::vector<std::string>* to_fetch, std::vector<std::string>* to_upload, std::vector<std::string>* to_update) { DCHECK(to_fetch); DCHECK(to_upload); DCHECK(to_update); scoped_ptr<ResourceMetadata::Iterator> it = metadata->GetIterator(); for (; !it->IsAtEnd(); it->Advance()) { const std::string& local_id = it->GetID(); const ResourceEntry& entry = it->GetValue(); if (entry.parent_local_id() == util::kDriveTrashDirLocalId) { to_update->push_back(local_id); continue; } switch (entry.metadata_edit_state()) { case ResourceEntry::CLEAN: break; case ResourceEntry::SYNCING: case ResourceEntry::DIRTY: to_update->push_back(local_id); break; } FileCacheEntry cache_entry; if (it->GetCacheEntry(&cache_entry)) { if (cache_entry.is_pinned() && !cache_entry.is_present()) to_fetch->push_back(local_id); if (cache_entry.is_dirty()) to_upload->push_back(local_id); } } DCHECK(!it->HasError()); } // Iterates cache entries and collects IDs of ones with obsolete cache files. void CheckExistingPinnedFiles(ResourceMetadata* metadata, FileCache* cache, std::vector<std::string>* local_ids) { scoped_ptr<FileCache::Iterator> it = cache->GetIterator(); for (; !it->IsAtEnd(); it->Advance()) { const FileCacheEntry& cache_entry = it->GetValue(); const std::string& local_id = it->GetID(); if (!cache_entry.is_pinned() || !cache_entry.is_present()) continue; ResourceEntry entry; FileError error = metadata->GetResourceEntryById(local_id, &entry); if (error != FILE_ERROR_OK) { LOG(WARNING) << "Entry not found: " << local_id; continue; } // If MD5s don't match, it indicates the local cache file is stale, unless // the file is dirty (the MD5 is "local"). We should never re-fetch the // file when we have a locally modified version. if (entry.file_specific_info().md5() == cache_entry.md5() || cache_entry.is_dirty()) continue; error = cache->Remove(local_id); if (error != FILE_ERROR_OK) { LOG(WARNING) << "Failed to remove cache entry: " << local_id; continue; } error = cache->Pin(local_id); if (error != FILE_ERROR_OK) { LOG(WARNING) << "Failed to pin cache entry: " << local_id; continue; } local_ids->push_back(local_id); } DCHECK(!it->HasError()); } } // namespace SyncClient::SyncTask::SyncTask() : state(PENDING), should_run_again(false) {} SyncClient::SyncTask::~SyncTask() {} SyncClient::SyncClient(base::SequencedTaskRunner* blocking_task_runner, file_system::OperationObserver* observer, JobScheduler* scheduler, ResourceMetadata* metadata, FileCache* cache, const base::FilePath& temporary_file_directory) : blocking_task_runner_(blocking_task_runner), metadata_(metadata), cache_(cache), download_operation_(new file_system::DownloadOperation( blocking_task_runner, observer, scheduler, metadata, cache, temporary_file_directory)), update_operation_(new file_system::UpdateOperation(blocking_task_runner, observer, scheduler, metadata, cache)), entry_update_performer_(new EntryUpdatePerformer(blocking_task_runner, observer, scheduler, metadata)), delay_(base::TimeDelta::FromSeconds(kDelaySeconds)), long_delay_(base::TimeDelta::FromSeconds(kLongDelaySeconds)), weak_ptr_factory_(this) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); } SyncClient::~SyncClient() { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); } void SyncClient::StartProcessingBacklog() { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); std::vector<std::string>* to_fetch = new std::vector<std::string>; std::vector<std::string>* to_upload = new std::vector<std::string>; std::vector<std::string>* to_remove = new std::vector<std::string>; blocking_task_runner_->PostTaskAndReply( FROM_HERE, base::Bind(&CollectBacklog, metadata_, to_fetch, to_upload, to_remove), base::Bind(&SyncClient::OnGetLocalIdsOfBacklog, weak_ptr_factory_.GetWeakPtr(), base::Owned(to_fetch), base::Owned(to_upload), base::Owned(to_remove))); } void SyncClient::StartCheckingExistingPinnedFiles() { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); std::vector<std::string>* local_ids = new std::vector<std::string>; blocking_task_runner_->PostTaskAndReply( FROM_HERE, base::Bind(&CheckExistingPinnedFiles, metadata_, cache_, local_ids), base::Bind(&SyncClient::AddFetchTasks, weak_ptr_factory_.GetWeakPtr(), base::Owned(local_ids))); } void SyncClient::AddFetchTask(const std::string& local_id) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); AddFetchTaskInternal(local_id, delay_); } void SyncClient::RemoveFetchTask(const std::string& local_id) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); SyncTasks::iterator it = tasks_.find(SyncTasks::key_type(FETCH, local_id)); if (it == tasks_.end()) return; SyncTask* task = &it->second; switch (task->state) { case PENDING: tasks_.erase(it); break; case RUNNING: // TODO(kinaba): Cancel tasks in JobScheduler as well. crbug.com/248856 break; } } void SyncClient::AddUploadTask(const ClientContext& context, const std::string& local_id) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); AddUploadTaskInternal(context, local_id, file_system::UpdateOperation::RUN_CONTENT_CHECK, delay_); } void SyncClient::AddUpdateTask(const std::string& local_id) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0)); } void SyncClient::AddFetchTaskInternal(const std::string& local_id, const base::TimeDelta& delay) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); SyncTask task; task.task = base::Bind( &file_system::DownloadOperation::EnsureFileDownloadedByLocalId, base::Unretained(download_operation_.get()), local_id, ClientContext(BACKGROUND), GetFileContentInitializedCallback(), google_apis::GetContentCallback(), base::Bind(&SyncClient::OnFetchFileComplete, weak_ptr_factory_.GetWeakPtr(), local_id)); AddTask(SyncTasks::key_type(FETCH, local_id), task, delay); } void SyncClient::AddUploadTaskInternal( const ClientContext& context, const std::string& local_id, file_system::UpdateOperation::ContentCheckMode content_check_mode, const base::TimeDelta& delay) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); SyncTask task; task.task = base::Bind( &file_system::UpdateOperation::UpdateFileByLocalId, base::Unretained(update_operation_.get()), local_id, context, content_check_mode, base::Bind(&SyncClient::OnUploadFileComplete, weak_ptr_factory_.GetWeakPtr(), local_id)); AddTask(SyncTasks::key_type(UPLOAD, local_id), task, delay); } void SyncClient::AddUpdateTaskInternal(const std::string& local_id, const base::TimeDelta& delay) { SyncTask task; task.task = base::Bind( &EntryUpdatePerformer::UpdateEntry, base::Unretained(entry_update_performer_.get()), local_id, base::Bind(&SyncClient::OnUpdateComplete, weak_ptr_factory_.GetWeakPtr(), local_id)); AddTask(SyncTasks::key_type(UPDATE, local_id), task, delay); } void SyncClient::AddTask(const SyncTasks::key_type& key, const SyncTask& task, const base::TimeDelta& delay) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); SyncTasks::iterator it = tasks_.find(key); if (it != tasks_.end()) { switch (it->second.state) { case PENDING: // The same task will run, do nothing. break; case RUNNING: // Something has changed since the task started. Schedule rerun. it->second.should_run_again = true; break; } return; } DCHECK_EQ(PENDING, task.state); tasks_[key] = task; base::MessageLoopProxy::current()->PostDelayedTask( FROM_HERE, base::Bind(&SyncClient::StartTask, weak_ptr_factory_.GetWeakPtr(), key), delay); } void SyncClient::StartTask(const SyncTasks::key_type& key) { SyncTasks::iterator it = tasks_.find(key); if (it == tasks_.end()) return; SyncTask* task = &it->second; switch (task->state) { case PENDING: task->state = RUNNING; task->task.Run(); break; case RUNNING: // Do nothing. break; } } void SyncClient::OnGetLocalIdsOfBacklog( const std::vector<std::string>* to_fetch, const std::vector<std::string>* to_upload, const std::vector<std::string>* to_update) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); // Give priority to upload tasks over fetch tasks, so that dirty files are // uploaded as soon as possible. for (size_t i = 0; i < to_upload->size(); ++i) { const std::string& local_id = (*to_upload)[i]; DVLOG(1) << "Queuing to upload: " << local_id; AddUploadTaskInternal(ClientContext(BACKGROUND), local_id, file_system::UpdateOperation::NO_CONTENT_CHECK, delay_); } for (size_t i = 0; i < to_fetch->size(); ++i) { const std::string& local_id = (*to_fetch)[i]; DVLOG(1) << "Queuing to fetch: " << local_id; AddFetchTaskInternal(local_id, delay_); } for (size_t i = 0; i < to_update->size(); ++i) { const std::string& local_id = (*to_update)[i]; DVLOG(1) << "Queuing to update: " << local_id; AddUpdateTask(local_id); } } void SyncClient::AddFetchTasks(const std::vector<std::string>* local_ids) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); for (size_t i = 0; i < local_ids->size(); ++i) AddFetchTask((*local_ids)[i]); } bool SyncClient::OnTaskComplete(SyncType type, const std::string& local_id) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); const SyncTasks::key_type key(type, local_id); SyncTasks::iterator it = tasks_.find(key); DCHECK(it != tasks_.end()); if (it->second.should_run_again) { DVLOG(1) << "Running again: type = " << type << ", id = " << local_id; it->second.should_run_again = false; it->second.task.Run(); return false; } tasks_.erase(it); return true; } void SyncClient::OnFetchFileComplete(const std::string& local_id, FileError error, const base::FilePath& local_path, scoped_ptr<ResourceEntry> entry) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); if (!OnTaskComplete(FETCH, local_id)) return; if (error == FILE_ERROR_OK) { DVLOG(1) << "Fetched " << local_id << ": " << local_path.value(); } else { switch (error) { case FILE_ERROR_ABORT: // If user cancels download, unpin the file so that we do not sync the // file again. base::PostTaskAndReplyWithResult( blocking_task_runner_, FROM_HERE, base::Bind(&FileCache::Unpin, base::Unretained(cache_), local_id), base::Bind(&util::EmptyFileOperationCallback)); break; case FILE_ERROR_NO_CONNECTION: // Add the task again so that we'll retry once the connection is back. AddFetchTaskInternal(local_id, delay_); break; case FILE_ERROR_SERVICE_UNAVAILABLE: // Add the task again so that we'll retry once the service is back. AddFetchTaskInternal(local_id, long_delay_); break; default: LOG(WARNING) << "Failed to fetch " << local_id << ": " << FileErrorToString(error); } } } void SyncClient::OnUploadFileComplete(const std::string& local_id, FileError error) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); if (!OnTaskComplete(UPLOAD, local_id)) return; if (error == FILE_ERROR_OK) { DVLOG(1) << "Uploaded " << local_id; } else { switch (error) { case FILE_ERROR_NO_CONNECTION: // Add the task again so that we'll retry once the connection is back. AddUploadTaskInternal(ClientContext(BACKGROUND), local_id, file_system::UpdateOperation::NO_CONTENT_CHECK, delay_); break; case FILE_ERROR_SERVICE_UNAVAILABLE: // Add the task again so that we'll retry once the service is back. AddUploadTaskInternal(ClientContext(BACKGROUND), local_id, file_system::UpdateOperation::NO_CONTENT_CHECK, long_delay_); break; default: LOG(WARNING) << "Failed to upload " << local_id << ": " << FileErrorToString(error); } } } void SyncClient::OnUpdateComplete(const std::string& local_id, FileError error) { DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI)); if (!OnTaskComplete(UPDATE, local_id)) return; if (error == FILE_ERROR_OK) { DVLOG(1) << "Updated " << local_id; } else { switch (error) { case FILE_ERROR_NO_CONNECTION: // Add the task again so that we'll retry once the connection is back. AddUpdateTaskInternal(local_id, base::TimeDelta::FromSeconds(0)); break; case FILE_ERROR_SERVICE_UNAVAILABLE: // Add the task again so that we'll retry once the service is back. AddUpdateTaskInternal(local_id, long_delay_); break; default: LOG(WARNING) << "Failed to update " << local_id << ": " << FileErrorToString(error); } } } } // namespace internal } // namespace drive