// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "webkit/glue/media/buffered_data_source.h"

#include "media/base/filter_host.h"
#include "net/base/net_errors.h"
#include "webkit/glue/media/web_data_source_factory.h"
#include "webkit/glue/webkit_glue.h"

using WebKit::WebFrame;

namespace webkit_glue {

// Defines how long we should wait for more data before we declare a connection
// timeout and start a new request.
// TODO(hclam): Set it to 5s, calibrate this value later.
static const int kTimeoutMilliseconds = 5000;

// Defines how many times we should try to read from a buffered resource loader
// before we declare a read error. After each failure of read from a buffered
// resource loader, a new one is created to be read.
static const int kReadTrials = 3;

// BufferedDataSource has an intermediate buffer, this value governs the initial
// size of that buffer. It is set to 32KB because this is a typical read size
// of FFmpeg.
static const int kInitialReadBufferSize = 32768;

static WebDataSource* NewBufferedDataSource(MessageLoop* render_loop,
                                            WebKit::WebFrame* frame) {
  return new BufferedDataSource(render_loop, frame);
}

// static
media::DataSourceFactory* BufferedDataSource::CreateFactory(
    MessageLoop* render_loop,
    WebKit::WebFrame* frame,
    WebDataSourceBuildObserverHack* build_observer) {
  return new WebDataSourceFactory(render_loop, frame, &NewBufferedDataSource,
                                  build_observer);
}

BufferedDataSource::BufferedDataSource(
    MessageLoop* render_loop,
    WebFrame* frame)
    : total_bytes_(kPositionNotSpecified),
      buffered_bytes_(0),
      loaded_(false),
      streaming_(false),
      frame_(frame),
      loader_(NULL),
      network_activity_(false),
      initialize_callback_(NULL),
      read_callback_(NULL),
      read_position_(0),
      read_size_(0),
      read_buffer_(NULL),
      read_attempts_(0),
      intermediate_read_buffer_(new uint8[kInitialReadBufferSize]),
      intermediate_read_buffer_size_(kInitialReadBufferSize),
      render_loop_(render_loop),
      stop_signal_received_(false),
      stopped_on_render_loop_(false),
      media_is_paused_(true),
      media_has_played_(false),
      preload_(media::METADATA),
      using_range_request_(true) {
}

BufferedDataSource::~BufferedDataSource() {
}

// A factory method to create BufferedResourceLoader using the read parameters.
// This method can be overrided to inject mock BufferedResourceLoader object
// for testing purpose.
BufferedResourceLoader* BufferedDataSource::CreateResourceLoader(
    int64 first_byte_position, int64 last_byte_position) {
  DCHECK(MessageLoop::current() == render_loop_);

  return new BufferedResourceLoader(url_,
                                    first_byte_position,
                                    last_byte_position);
}

// This method simply returns kTimeoutMilliseconds. The purpose of this
// method is to be overidded so as to provide a different timeout value
// for testing purpose.
base::TimeDelta BufferedDataSource::GetTimeoutMilliseconds() {
  return base::TimeDelta::FromMilliseconds(kTimeoutMilliseconds);
}

void BufferedDataSource::set_host(media::FilterHost* host) {
  DataSource::set_host(host);

  if (loader_.get())
    UpdateHostState();
}

void BufferedDataSource::Initialize(const std::string& url,
                                    media::PipelineStatusCallback* callback) {
  // Saves the url.
  url_ = GURL(url);

  // This data source doesn't support data:// protocol so reject it.
  if (url_.SchemeIs(kDataScheme)) {
    callback->Run(media::DATASOURCE_ERROR_URL_NOT_SUPPORTED);
    delete callback;
    return;
  } else if (!IsProtocolSupportedForMedia(url_)) {
    callback->Run(media::PIPELINE_ERROR_NETWORK);
    delete callback;
    return;
  }

  DCHECK(callback);
  initialize_callback_.reset(callback);

  media_format_.SetAsString(media::MediaFormat::kURL, url);

  // Post a task to complete the initialization task.
  render_loop_->PostTask(FROM_HERE,
      NewRunnableMethod(this, &BufferedDataSource::InitializeTask));
}

void BufferedDataSource::CancelInitialize() {
  base::AutoLock auto_lock(lock_);
  DCHECK(initialize_callback_.get());

  initialize_callback_.reset();

  render_loop_->PostTask(
      FROM_HERE, NewRunnableMethod(this, &BufferedDataSource::CleanupTask));
}

/////////////////////////////////////////////////////////////////////////////
// media::Filter implementation.
void BufferedDataSource::Stop(media::FilterCallback* callback) {
  {
    base::AutoLock auto_lock(lock_);
    stop_signal_received_ = true;
  }
  if (callback) {
    callback->Run();
    delete callback;
  }

  render_loop_->PostTask(FROM_HERE,
      NewRunnableMethod(this, &BufferedDataSource::CleanupTask));
}

void BufferedDataSource::SetPlaybackRate(float playback_rate) {
  render_loop_->PostTask(FROM_HERE,
      NewRunnableMethod(this, &BufferedDataSource::SetPlaybackRateTask,
                        playback_rate));
}

void BufferedDataSource::SetPreload(media::Preload preload) {
  render_loop_->PostTask(FROM_HERE,
      NewRunnableMethod(this, &BufferedDataSource::SetPreloadTask,
                        preload));
}

/////////////////////////////////////////////////////////////////////////////
// media::DataSource implementation.
void BufferedDataSource::Read(int64 position, size_t size, uint8* data,
                              media::DataSource::ReadCallback* read_callback) {
  DCHECK(read_callback);

  {
    base::AutoLock auto_lock(lock_);
    DCHECK(!read_callback_.get());

    if (stop_signal_received_ || stopped_on_render_loop_) {
      read_callback->RunWithParams(
          Tuple1<size_t>(static_cast<size_t>(media::DataSource::kReadError)));
      delete read_callback;
      return;
    }

    read_callback_.reset(read_callback);
  }

  render_loop_->PostTask(FROM_HERE,
      NewRunnableMethod(this, &BufferedDataSource::ReadTask,
                        position, static_cast<int>(size), data));
}

bool BufferedDataSource::GetSize(int64* size_out) {
  if (total_bytes_ != kPositionNotSpecified) {
    *size_out = total_bytes_;
    return true;
  }
  *size_out = 0;
  return false;
}

bool BufferedDataSource::IsStreaming() {
  return streaming_;
}

bool BufferedDataSource::HasSingleOrigin() {
  DCHECK(MessageLoop::current() == render_loop_);
  return loader_.get() ? loader_->HasSingleOrigin() : true;
}

void BufferedDataSource::Abort() {
  DCHECK(MessageLoop::current() == render_loop_);

  CleanupTask();
  frame_ = NULL;
}

/////////////////////////////////////////////////////////////////////////////
// Render thread tasks.
void BufferedDataSource::InitializeTask() {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(!loader_.get());
  if (stopped_on_render_loop_ || !initialize_callback_.get())
    return;

  // Kick starts the watch dog task that will handle connection timeout.
  // We run the watch dog 2 times faster the actual timeout so as to catch
  // the timeout more accurately.
  watch_dog_timer_.Start(
      GetTimeoutMilliseconds() / 2,
      this,
      &BufferedDataSource::WatchDogTask);

  if (url_.SchemeIs(kHttpScheme) || url_.SchemeIs(kHttpsScheme)) {
    // Do an unbounded range request starting at the beginning.  If the server
    // responds with 200 instead of 206 we'll fall back into a streaming mode.
    loader_ = CreateResourceLoader(0, kPositionNotSpecified);
    loader_->Start(
        NewCallback(this, &BufferedDataSource::HttpInitialStartCallback),
        NewCallback(this, &BufferedDataSource::NetworkEventCallback),
        frame_);
  } else {
    // For all other protocols, assume they support range request. We fetch
    // the full range of the resource to obtain the instance size because
    // we won't be served HTTP headers.
    loader_ = CreateResourceLoader(kPositionNotSpecified,
                                   kPositionNotSpecified);
    loader_->Start(
        NewCallback(this, &BufferedDataSource::NonHttpInitialStartCallback),
        NewCallback(this, &BufferedDataSource::NetworkEventCallback),
        frame_);
  }
}

void BufferedDataSource::ReadTask(
    int64 position,
    int read_size,
    uint8* buffer) {
  DCHECK(MessageLoop::current() == render_loop_);
  {
    base::AutoLock auto_lock(lock_);
    if (stopped_on_render_loop_)
      return;

    DCHECK(read_callback_.get());
  }

  // Saves the read parameters.
  read_position_ = position;
  read_size_ = read_size;
  read_buffer_ = buffer;
  read_submitted_time_ = base::Time::Now();
  read_attempts_ = 0;

  // Call to read internal to perform the actual read.
  ReadInternal();
}

void BufferedDataSource::CleanupTask() {
  DCHECK(MessageLoop::current() == render_loop_);

  {
    base::AutoLock auto_lock(lock_);
    if (stopped_on_render_loop_)
      return;

    // Signal that stop task has finished execution.
    // NOTE: it's vital that this be set under lock, as that's how Read() tests
    // before registering a new |read_callback_| (which is cleared below).
    stopped_on_render_loop_ = true;

    if (read_callback_.get())
      DoneRead_Locked(net::ERR_FAILED);
  }

  // Stop the watch dog.
  watch_dog_timer_.Stop();

  // We just need to stop the loader, so it stops activity.
  if (loader_.get())
    loader_->Stop();

  // Reset the parameters of the current read request.
  read_position_ = 0;
  read_size_ = 0;
  read_buffer_ = 0;
  read_submitted_time_ = base::Time();
  read_attempts_ = 0;
}

void BufferedDataSource::RestartLoadingTask() {
  DCHECK(MessageLoop::current() == render_loop_);
  if (stopped_on_render_loop_)
    return;

  {
    // If there's no outstanding read then return early.
    base::AutoLock auto_lock(lock_);
    if (!read_callback_.get())
      return;
  }

  loader_ = CreateResourceLoader(read_position_, kPositionNotSpecified);
  BufferedResourceLoader::DeferStrategy strategy = ChooseDeferStrategy();
  loader_->UpdateDeferStrategy(strategy);
  loader_->Start(
      NewCallback(this, &BufferedDataSource::PartialReadStartCallback),
      NewCallback(this, &BufferedDataSource::NetworkEventCallback),
      frame_);
}

void BufferedDataSource::WatchDogTask() {
  DCHECK(MessageLoop::current() == render_loop_);
  if (stopped_on_render_loop_)
    return;

  // We only care if there is an active read request.
  {
    base::AutoLock auto_lock(lock_);
    if (!read_callback_.get())
      return;
  }

  DCHECK(loader_.get());
  base::TimeDelta delta = base::Time::Now() - read_submitted_time_;
  if (delta < GetTimeoutMilliseconds())
    return;

  // TODO(hclam): Maybe raise an error here. But if an error is reported
  // the whole pipeline may get destroyed...
  if (read_attempts_ >= kReadTrials)
    return;

  ++read_attempts_;
  read_submitted_time_ = base::Time::Now();

  // Stops the current loader and creates a new resource loader and
  // retry the request.
  loader_->Stop();
  loader_ = CreateResourceLoader(read_position_, kPositionNotSpecified);
  BufferedResourceLoader::DeferStrategy strategy = ChooseDeferStrategy();
  loader_->UpdateDeferStrategy(strategy);
  loader_->Start(
      NewCallback(this, &BufferedDataSource::PartialReadStartCallback),
      NewCallback(this, &BufferedDataSource::NetworkEventCallback),
      frame_);
}

void BufferedDataSource::SetPlaybackRateTask(float playback_rate) {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(loader_.get());

  bool previously_paused = media_is_paused_;
  media_is_paused_ = (playback_rate == 0.0);

  if (!media_has_played_ && previously_paused && !media_is_paused_)
    media_has_played_ = true;

  BufferedResourceLoader::DeferStrategy strategy = ChooseDeferStrategy();
  loader_->UpdateDeferStrategy(strategy);
}

void BufferedDataSource::SetPreloadTask(media::Preload preload) {
  DCHECK(MessageLoop::current() == render_loop_);
  preload_ = preload;
}

BufferedResourceLoader::DeferStrategy
BufferedDataSource::ChooseDeferStrategy() {
  // If the user indicates preload=metadata, then just load exactly
  // what is needed for starting the pipeline and prerolling frames.
  if (preload_ == media::METADATA && !media_has_played_)
    return BufferedResourceLoader::kReadThenDefer;

  // In general, we want to try to buffer the entire video when the video
  // is paused. But we don't want to do this if the video hasn't played yet
  // and preload!=auto.
  if (media_is_paused_ &&
      (preload_ == media::AUTO || media_has_played_)) {
    return BufferedResourceLoader::kNeverDefer;
  }

  // When the video is playing, regardless of preload state, we buffer up
  // to a hard limit and enable/disable deferring when the buffer is
  // depleted/full.
  return BufferedResourceLoader::kThresholdDefer;
}

// This method is the place where actual read happens, |loader_| must be valid
// prior to make this method call.
void BufferedDataSource::ReadInternal() {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(loader_);

  // First we prepare the intermediate read buffer for BufferedResourceLoader
  // to write to.
  if (read_size_ > intermediate_read_buffer_size_) {
    intermediate_read_buffer_.reset(new uint8[read_size_]);
  }

  // Perform the actual read with BufferedResourceLoader.
  loader_->Read(read_position_, read_size_, intermediate_read_buffer_.get(),
                NewCallback(this, &BufferedDataSource::ReadCallback));
}

// Method to report the results of the current read request. Also reset all
// the read parameters.
void BufferedDataSource::DoneRead_Locked(int error) {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(read_callback_.get());
  lock_.AssertAcquired();

  if (error >= 0) {
    read_callback_->RunWithParams(Tuple1<size_t>(error));
  } else {
    read_callback_->RunWithParams(
        Tuple1<size_t>(static_cast<size_t>(media::DataSource::kReadError)));
  }

  read_callback_.reset();
  read_position_ = 0;
  read_size_ = 0;
  read_buffer_ = 0;
}

void BufferedDataSource::DoneInitialization_Locked(
    media::PipelineStatus status) {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(initialize_callback_.get());
  lock_.AssertAcquired();

  scoped_ptr<media::PipelineStatusCallback> initialize_callback(
      initialize_callback_.release());
  initialize_callback->Run(status);
}

/////////////////////////////////////////////////////////////////////////////
// BufferedResourceLoader callback methods.
void BufferedDataSource::HttpInitialStartCallback(int error) {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(loader_.get());

  int64 instance_size = loader_->instance_size();
  bool success = error == net::OK;

  if (!initialize_callback_.get()) {
    loader_->Stop();
    return;
  }

  if (success) {
    // TODO(hclam): Needs more thinking about supporting servers without range
    // request or their partial response is not complete.
    total_bytes_ = instance_size;
    loaded_ = false;
    streaming_ = (instance_size == kPositionNotSpecified) ||
        !loader_->range_supported();
  } else {
    // TODO(hclam): In case of failure, we can retry several times.
    loader_->Stop();
  }

  if (error == net::ERR_INVALID_RESPONSE && using_range_request_) {
    // Assuming that the Range header was causing the problem. Retry without
    // the Range header.
    using_range_request_ = false;
    loader_ = CreateResourceLoader(kPositionNotSpecified,
                                   kPositionNotSpecified);
    loader_->Start(
        NewCallback(this, &BufferedDataSource::HttpInitialStartCallback),
        NewCallback(this, &BufferedDataSource::NetworkEventCallback),
        frame_);
    return;
  }

  // Reference to prevent destruction while inside the |initialize_callback_|
  // call. This is a temporary fix to prevent crashes caused by holding the
  // lock and running the destructor.
  // TODO: Review locking in this class and figure out a way to run the callback
  //       w/o the lock.
  scoped_refptr<BufferedDataSource> destruction_guard(this);
  {
    // We need to prevent calling to filter host and running the callback if
    // we have received the stop signal. We need to lock down the whole callback
    // method to prevent bad things from happening. The reason behind this is
    // that we cannot guarantee tasks on render thread have completely stopped
    // when we receive the Stop() method call. The only way to solve this is to
    // let tasks on render thread to run but make sure they don't call outside
    // this object when Stop() method is ever called. Locking this method is
    // safe because |lock_| is only acquired in tasks on render thread.
    base::AutoLock auto_lock(lock_);
    if (stop_signal_received_)
      return;

    if (!success) {
      DoneInitialization_Locked(media::PIPELINE_ERROR_NETWORK);
      return;
    }

    UpdateHostState();
    DoneInitialization_Locked(media::PIPELINE_OK);
  }
}

void BufferedDataSource::NonHttpInitialStartCallback(int error) {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(loader_.get());

  if (!initialize_callback_.get()) {
    loader_->Stop();
    return;
  }

  int64 instance_size = loader_->instance_size();
  bool success = error == net::OK && instance_size != kPositionNotSpecified;

  if (success) {
    total_bytes_ = instance_size;
    buffered_bytes_ = total_bytes_;
    loaded_ = true;
  } else {
    loader_->Stop();
  }

  // Reference to prevent destruction while inside the |initialize_callback_|
  // call. This is a temporary fix to prevent crashes caused by holding the
  // lock and running the destructor.
  // TODO: Review locking in this class and figure out a way to run the callback
  //       w/o the lock.
  scoped_refptr<BufferedDataSource> destruction_guard(this);
  {
    // We need to prevent calling to filter host and running the callback if
    // we have received the stop signal. We need to lock down the whole callback
    // method to prevent bad things from happening. The reason behind this is
    // that we cannot guarantee tasks on render thread have completely stopped
    // when we receive the Stop() method call. The only way to solve this is to
    // let tasks on render thread to run but make sure they don't call outside
    // this object when Stop() method is ever called. Locking this method is
    // safe because |lock_| is only acquired in tasks on render thread.
    base::AutoLock auto_lock(lock_);
    if (stop_signal_received_ || !initialize_callback_.get())
      return;

    if (!success) {
      DoneInitialization_Locked(media::PIPELINE_ERROR_NETWORK);
      return;
    }

    UpdateHostState();
    DoneInitialization_Locked(media::PIPELINE_OK);
  }
}

void BufferedDataSource::PartialReadStartCallback(int error) {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(loader_.get());

  if (error == net::OK) {
    // Once the request has started successfully, we can proceed with
    // reading from it.
    ReadInternal();
    return;
  }

  // Stop the resource loader since we have received an error.
  loader_->Stop();

  // We need to prevent calling to filter host and running the callback if
  // we have received the stop signal. We need to lock down the whole callback
  // method to prevent bad things from happening. The reason behind this is
  // that we cannot guarantee tasks on render thread have completely stopped
  // when we receive the Stop() method call. So only way to solve this is to
  // let tasks on render thread to run but make sure they don't call outside
  // this object when Stop() method is ever called. Locking this method is
  // safe because |lock_| is only acquired in tasks on render thread.
  base::AutoLock auto_lock(lock_);
  if (stop_signal_received_)
    return;
  DoneRead_Locked(net::ERR_INVALID_RESPONSE);
}

void BufferedDataSource::ReadCallback(int error) {
  DCHECK(MessageLoop::current() == render_loop_);

  if (error < 0) {
    DCHECK(loader_.get());

    // Stop the resource load if it failed.
    loader_->Stop();

    if (error == net::ERR_CACHE_MISS) {
      render_loop_->PostTask(FROM_HERE,
          NewRunnableMethod(this, &BufferedDataSource::RestartLoadingTask));
      return;
    }
  }

  // We need to prevent calling to filter host and running the callback if
  // we have received the stop signal. We need to lock down the whole callback
  // method to prevent bad things from happening. The reason behind this is
  // that we cannot guarantee tasks on render thread have completely stopped
  // when we receive the Stop() method call. So only way to solve this is to
  // let tasks on render thread to run but make sure they don't call outside
  // this object when Stop() method is ever called. Locking this method is safe
  // because |lock_| is only acquired in tasks on render thread.
  base::AutoLock auto_lock(lock_);
  if (stop_signal_received_)
    return;

  if (error > 0) {
    // If a position error code is received, read was successful. So copy
    // from intermediate read buffer to the target read buffer.
    memcpy(read_buffer_, intermediate_read_buffer_.get(), error);
  } else if (error == 0 && total_bytes_ == kPositionNotSpecified) {
    // We've reached the end of the file and we didn't know the total size
    // before. Update the total size so Read()s past the end of the file will
    // fail like they would if we had known the file size at the beginning.
    total_bytes_ = loader_->instance_size();

    if (host() && total_bytes_ != kPositionNotSpecified)
      host()->SetTotalBytes(total_bytes_);
  }
  DoneRead_Locked(error);
}

void BufferedDataSource::NetworkEventCallback() {
  DCHECK(MessageLoop::current() == render_loop_);
  DCHECK(loader_.get());

  // In case of non-HTTP request we don't need to report network events,
  // so return immediately.
  if (loaded_)
    return;

  bool network_activity = loader_->network_activity();
  int64 buffered_position = loader_->GetBufferedPosition();

  // If we get an unspecified value, return immediately.
  if (buffered_position == kPositionNotSpecified)
    return;

  // We need to prevent calling to filter host and running the callback if
  // we have received the stop signal. We need to lock down the whole callback
  // method to prevent bad things from happening. The reason behind this is
  // that we cannot guarantee tasks on render thread have completely stopped
  // when we receive the Stop() method call. So only way to solve this is to
  // let tasks on render thread to run but make sure they don't call outside
  // this object when Stop() method is ever called. Locking this method is safe
  // because |lock_| is only acquired in tasks on render thread.
  base::AutoLock auto_lock(lock_);
  if (stop_signal_received_)
    return;

  if (network_activity != network_activity_) {
    network_activity_ = network_activity;
    if (host())
      host()->SetNetworkActivity(network_activity);
  }

  buffered_bytes_ = buffered_position + 1;
  if (host())
    host()->SetBufferedBytes(buffered_bytes_);
}

void BufferedDataSource::UpdateHostState() {
  media::FilterHost* filter_host = host();
  if (!filter_host)
    return;

  filter_host->SetLoaded(loaded_);

  if (streaming_) {
    filter_host->SetStreaming(true);
  } else {
    filter_host->SetTotalBytes(total_bytes_);
    filter_host->SetBufferedBytes(buffered_bytes_);
  }
}

}  // namespace webkit_glue