// Copyright (c) 2009 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "net/proxy/single_threaded_proxy_resolver.h"
#include "base/thread.h"
#include "net/base/load_log.h"
#include "net/base/net_errors.h"
#include "net/proxy/proxy_info.h"
namespace net {
namespace {
class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
public:
explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
void PurgeMemory() { resolver_->PurgeMemory(); }
private:
friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
~PurgeMemoryTask() {}
ProxyResolver* resolver_;
};
}
// SingleThreadedProxyResolver::SetPacScriptTask ------------------------------
// Runs on the worker thread to call ProxyResolver::SetPacScript.
class SingleThreadedProxyResolver::SetPacScriptTask
: public base::RefCountedThreadSafe<
SingleThreadedProxyResolver::SetPacScriptTask> {
public:
SetPacScriptTask(SingleThreadedProxyResolver* coordinator,
const GURL& pac_url,
const std::string& pac_bytes,
CompletionCallback* callback)
: coordinator_(coordinator),
callback_(callback),
pac_bytes_(pac_bytes),
pac_url_(pac_url),
origin_loop_(MessageLoop::current()) {
DCHECK(callback);
}
// Start the SetPacScript request on the worker thread.
void Start() {
coordinator_->thread()->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest,
coordinator_->resolver_.get()));
}
void Cancel() {
// Clear these to inform RequestComplete that it should not try to
// access them.
coordinator_ = NULL;
callback_ = NULL;
}
// Returns true if Cancel() has been called.
bool was_cancelled() const { return callback_ == NULL; }
private:
friend class base::RefCountedThreadSafe<
SingleThreadedProxyResolver::SetPacScriptTask>;
~SetPacScriptTask() {}
// Runs on the worker thread.
void DoRequest(ProxyResolver* resolver) {
int rv = resolver->expects_pac_bytes() ?
resolver->SetPacScriptByData(pac_bytes_, NULL) :
resolver->SetPacScriptByUrl(pac_url_, NULL);
DCHECK_NE(rv, ERR_IO_PENDING);
origin_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv));
}
// Runs the completion callback on the origin thread.
void RequestComplete(int result_code) {
// The task may have been cancelled after it was started.
if (!was_cancelled()) {
CompletionCallback* callback = callback_;
coordinator_->RemoveOutstandingSetPacScriptTask(this);
callback->Run(result_code);
}
}
// Must only be used on the "origin" thread.
SingleThreadedProxyResolver* coordinator_;
CompletionCallback* callback_;
std::string pac_bytes_;
GURL pac_url_;
// Usable from within DoQuery on the worker thread.
MessageLoop* origin_loop_;
};
// SingleThreadedProxyResolver::Job -------------------------------------------
class SingleThreadedProxyResolver::Job
: public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> {
public:
// |coordinator| -- the SingleThreadedProxyResolver that owns this job.
// |url| -- the URL of the query.
// |results| -- the structure to fill with proxy resolve results.
Job(SingleThreadedProxyResolver* coordinator,
const GURL& url,
ProxyInfo* results,
CompletionCallback* callback,
LoadLog* load_log)
: coordinator_(coordinator),
callback_(callback),
results_(results),
load_log_(load_log),
url_(url),
is_started_(false),
origin_loop_(MessageLoop::current()) {
DCHECK(callback);
}
// Start the resolve proxy request on the worker thread.
void Start() {
is_started_ = true;
size_t load_log_bound = load_log_ ? load_log_->max_num_entries() : 0;
coordinator_->thread()->message_loop()->PostTask(
FROM_HERE, NewRunnableMethod(this, &Job::DoQuery,
coordinator_->resolver_.get(),
load_log_bound));
}
bool is_started() const { return is_started_; }
void Cancel() {
// Clear these to inform QueryComplete that it should not try to
// access them.
coordinator_ = NULL;
callback_ = NULL;
results_ = NULL;
}
// Returns true if Cancel() has been called.
bool was_cancelled() const { return callback_ == NULL; }
private:
friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>;
~Job() {}
// Runs on the worker thread.
void DoQuery(ProxyResolver* resolver, size_t load_log_bound) {
LoadLog* worker_log = NULL;
if (load_log_bound > 0) {
worker_log = new LoadLog(load_log_bound);
worker_log->AddRef(); // Balanced in QueryComplete.
}
int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL,
worker_log);
DCHECK_NE(rv, ERR_IO_PENDING);
origin_loop_->PostTask(FROM_HERE,
NewRunnableMethod(this, &Job::QueryComplete, rv, worker_log));
}
// Runs the completion callback on the origin thread.
void QueryComplete(int result_code, LoadLog* worker_log) {
// Merge the load log that was generated on the worker thread, into the
// main log.
if (worker_log) {
if (load_log_)
load_log_->Append(worker_log);
worker_log->Release();
}
// The Job may have been cancelled after it was started.
if (!was_cancelled()) {
if (result_code >= OK) { // Note: unit-tests use values > 0.
results_->Use(results_buf_);
}
callback_->Run(result_code);
// We check for cancellation once again, in case the callback deleted
// the owning ProxyService (whose destructor will in turn cancel us).
if (!was_cancelled())
coordinator_->RemoveFrontOfJobsQueueAndStartNext(this);
}
}
// Must only be used on the "origin" thread.
SingleThreadedProxyResolver* coordinator_;
CompletionCallback* callback_;
ProxyInfo* results_;
scoped_refptr<LoadLog> load_log_;
GURL url_;
bool is_started_;
// Usable from within DoQuery on the worker thread.
ProxyInfo results_buf_;
MessageLoop* origin_loop_;
};
// SingleThreadedProxyResolver ------------------------------------------------
SingleThreadedProxyResolver::SingleThreadedProxyResolver(
ProxyResolver* resolver)
: ProxyResolver(resolver->expects_pac_bytes()),
resolver_(resolver) {
}
SingleThreadedProxyResolver::~SingleThreadedProxyResolver() {
// Cancel the inprogress job (if any), and free the rest.
for (PendingJobsQueue::iterator it = pending_jobs_.begin();
it != pending_jobs_.end();
++it) {
(*it)->Cancel();
}
if (outstanding_set_pac_script_task_)
outstanding_set_pac_script_task_->Cancel();
// Note that |thread_| is destroyed before |resolver_|. This is important
// since |resolver_| could be running on |thread_|.
}
int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url,
ProxyInfo* results,
CompletionCallback* callback,
RequestHandle* request,
LoadLog* load_log) {
DCHECK(callback);
scoped_refptr<Job> job = new Job(this, url, results, callback, load_log);
pending_jobs_.push_back(job);
ProcessPendingJobs(); // Jobs can never finish synchronously.
// Completion will be notified through |callback|, unless the caller cancels
// the request using |request|.
if (request)
*request = reinterpret_cast<RequestHandle>(job.get());
return ERR_IO_PENDING;
}
// There are three states of the request we need to handle:
// (1) Not started (just sitting in the queue).
// (2) Executing Job::DoQuery in the worker thread.
// (3) Waiting for Job::QueryComplete to be run on the origin thread.
void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) {
DCHECK(req);
Job* job = reinterpret_cast<Job*>(req);
bool is_active_job = job->is_started() && !pending_jobs_.empty() &&
pending_jobs_.front().get() == job;
job->Cancel();
if (is_active_job) {
RemoveFrontOfJobsQueueAndStartNext(job);
return;
}
// Otherwise just delete the job from the queue.
PendingJobsQueue::iterator it = std::find(
pending_jobs_.begin(), pending_jobs_.end(), job);
DCHECK(it != pending_jobs_.end());
pending_jobs_.erase(it);
}
void SingleThreadedProxyResolver::CancelSetPacScript() {
DCHECK(outstanding_set_pac_script_task_);
outstanding_set_pac_script_task_->Cancel();
outstanding_set_pac_script_task_ = NULL;
}
void SingleThreadedProxyResolver::PurgeMemory() {
if (thread_.get()) {
scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
thread_->message_loop()->PostTask(FROM_HERE,
NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
}
}
int SingleThreadedProxyResolver::SetPacScript(
const GURL& pac_url,
const std::string& pac_bytes,
CompletionCallback* callback) {
EnsureThreadStarted();
DCHECK(!outstanding_set_pac_script_task_);
SetPacScriptTask* task = new SetPacScriptTask(
this, pac_url, pac_bytes, callback);
outstanding_set_pac_script_task_ = task;
task->Start();
return ERR_IO_PENDING;
}
void SingleThreadedProxyResolver::EnsureThreadStarted() {
if (!thread_.get()) {
thread_.reset(new base::Thread("pac-thread"));
thread_->Start();
}
}
void SingleThreadedProxyResolver::ProcessPendingJobs() {
if (pending_jobs_.empty())
return;
// Get the next job to process (FIFO).
Job* job = pending_jobs_.front().get();
if (job->is_started())
return;
EnsureThreadStarted();
job->Start();
}
void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext(
Job* expected_job) {
DCHECK_EQ(expected_job, pending_jobs_.front().get());
pending_jobs_.pop_front();
// Start next work item.
ProcessPendingJobs();
}
void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask(
SetPacScriptTask* task) {
DCHECK_EQ(outstanding_set_pac_script_task_.get(), task);
outstanding_set_pac_script_task_ = NULL;
}
} // namespace net