// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

#include "net/tools/flip_server/spdy_interface.h"

#include <algorithm>
#include <string>

#include "net/spdy/spdy_framer.h"
#include "net/spdy/spdy_protocol.h"
#include "net/tools/dump_cache/url_utilities.h"
#include "net/tools/flip_server/constants.h"
#include "net/tools/flip_server/flip_config.h"
#include "net/tools/flip_server/http_interface.h"
#include "net/tools/flip_server/spdy_util.h"

namespace net {

// static
std::string SpdySM::forward_ip_header_;

class SpdyFrameDataFrame : public DataFrame {
 public:
  explicit SpdyFrameDataFrame(SpdyFrame* spdy_frame) : frame(spdy_frame) {
    data = spdy_frame->data();
    size = spdy_frame->size();
  }

  virtual ~SpdyFrameDataFrame() { delete frame; }

  const SpdyFrame* frame;
};

SpdySM::SpdySM(SMConnection* connection,
               SMInterface* sm_http_interface,
               EpollServer* epoll_server,
               MemoryCache* memory_cache,
               FlipAcceptor* acceptor,
               SpdyMajorVersion spdy_version)
    : buffered_spdy_framer_(new BufferedSpdyFramer(spdy_version, true)),
      valid_spdy_session_(false),
      connection_(connection),
      client_output_list_(connection->output_list()),
      client_output_ordering_(connection),
      next_outgoing_stream_id_(2),
      epoll_server_(epoll_server),
      acceptor_(acceptor),
      memory_cache_(memory_cache),
      close_on_error_(false) {
  buffered_spdy_framer_->set_visitor(this);
}

SpdySM::~SpdySM() { }

void SpdySM::InitSMConnection(SMConnectionPoolInterface* connection_pool,
                              SMInterface* sm_interface,
                              EpollServer* epoll_server,
                              int fd,
                              std::string server_ip,
                              std::string server_port,
                              std::string remote_ip,
                              bool use_ssl) {
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Initializing server connection.";
  connection_->InitSMConnection(connection_pool,
                                sm_interface,
                                epoll_server,
                                fd,
                                server_ip,
                                server_port,
                                remote_ip,
                                use_ssl);
}

SMInterface* SpdySM::NewConnectionInterface() {
  SMConnection* server_connection =
      SMConnection::NewSMConnection(epoll_server_,
                                    NULL,
                                    memory_cache_,
                                    acceptor_,
                                    "http_conn: ");
  if (server_connection == NULL) {
    LOG(ERROR) << "SpdySM: Could not create server connection";
    return NULL;
  }
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Creating new HTTP interface";
  SMInterface* sm_http_interface =
      new HttpSM(server_connection, this, memory_cache_, acceptor_);
  return sm_http_interface;
}

SMInterface* SpdySM::FindOrMakeNewSMConnectionInterface(
    const std::string& server_ip,
    const std::string& server_port) {
  SMInterface* sm_http_interface;
  int32 server_idx;
  if (unused_server_interface_list.empty()) {
    sm_http_interface = NewConnectionInterface();
    server_idx = server_interface_list.size();
    server_interface_list.push_back(sm_http_interface);
    VLOG(2) << ACCEPTOR_CLIENT_IDENT
            << "SpdySM: Making new server connection on index: " << server_idx;
  } else {
    server_idx = unused_server_interface_list.back();
    unused_server_interface_list.pop_back();
    sm_http_interface = server_interface_list.at(server_idx);
    VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reusing connection on "
            << "index: " << server_idx;
  }

  sm_http_interface->InitSMInterface(this, server_idx);
  sm_http_interface->InitSMConnection(NULL,
                                      sm_http_interface,
                                      epoll_server_,
                                      -1,
                                      server_ip,
                                      server_port,
                                      std::string(),
                                      false);

  return sm_http_interface;
}

int SpdySM::SpdyHandleNewStream(SpdyStreamId stream_id,
                                SpdyPriority priority,
                                const SpdyHeaderBlock& headers,
                                std::string& http_data,
                                bool* is_https_scheme) {
  *is_https_scheme = false;
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSyn(" << stream_id << ")";
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: # headers: " << headers.size();

  SpdyHeaderBlock::const_iterator method = headers.end();
  SpdyHeaderBlock::const_iterator host = headers.end();
  SpdyHeaderBlock::const_iterator path = headers.end();
  SpdyHeaderBlock::const_iterator scheme = headers.end();
  SpdyHeaderBlock::const_iterator version = headers.end();
  SpdyHeaderBlock::const_iterator url = headers.end();

  std::string path_string, host_string, version_string;

  if (spdy_version() == SPDY2) {
    url = headers.find("url");
    method = headers.find("method");
    version = headers.find("version");
    scheme = headers.find("scheme");
    if (url == headers.end() || method == headers.end() ||
        version == headers.end() || scheme == headers.end()) {
      VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: A mandatory header is "
              << "missing. Not creating stream";
      return 0;
    }
    // url->second here only ever seems to contain just the path. When this
    // path contains a query string with a http:// in one of its values,
    // UrlUtilities::GetUrlPath will fail and always return a / breaking
    // the request. GetUrlPath assumes the absolute URL is being passed in.
    path_string = UrlUtilities::GetUrlPath(url->second);
    host_string = UrlUtilities::GetUrlHost(url->second);
    version_string = version->second;
  } else {
    method = headers.find(":method");
    host = headers.find(":host");
    path = headers.find(":path");
    scheme = headers.find(":scheme");
    if (method == headers.end() || host == headers.end() ||
        path == headers.end() || scheme == headers.end()) {
      VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: A mandatory header is "
              << "missing. Not creating stream";
      return 0;
    }
    host_string = host->second;
    path_string = path->second;
    version_string = "HTTP/1.1";
  }

  if (scheme->second.compare("https") == 0) {
    *is_https_scheme = true;
  }

  if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) {
    VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second
            << " " << path_string;
    std::string filename = EncodeURL(path_string,
                                     host_string,
                                     method->second);
    NewStream(stream_id, priority, filename);
  } else {
    http_data +=
        method->second + " " + path_string + " " + version_string + "\r\n";
    VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Request: " << method->second << " "
            << path_string << " " << version_string;
    http_data += "Host: " + (*is_https_scheme ?
                             acceptor_->https_server_ip_ :
                             acceptor_->http_server_ip_) + "\r\n";
    for (SpdyHeaderBlock::const_iterator i = headers.begin();
         i != headers.end(); ++i) {
      if ((i->first.size() > 0 && i->first[0] == ':') ||
          i->first == "host" ||
          i == method ||
          i == host ||
          i == path ||
          i == scheme ||
          i == version ||
          i == url) {
        // Ignore the entry.
      } else {
        http_data += i->first + ": " + i->second + "\r\n";
        VLOG(2) << ACCEPTOR_CLIENT_IDENT << i->first.c_str() << ":"
                << i->second.c_str();
      }
    }
    if (forward_ip_header_.length()) {
      // X-Client-Cluster-IP header
      http_data += forward_ip_header_ + ": " +
          connection_->client_ip() + "\r\n";
    }
    http_data += "\r\n";
  }

  VLOG(3) << ACCEPTOR_CLIENT_IDENT << "SpdySM: HTTP Request:\n" << http_data;
  return 1;
}

void SpdySM::OnStreamFrameData(SpdyStreamId stream_id,
                               const char* data,
                               size_t len,
                               bool fin) {
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: StreamData(" << stream_id
          << ", [" << len << "])";
  StreamToSmif::iterator it = stream_to_smif_.find(stream_id);
  if (it == stream_to_smif_.end()) {
    VLOG(2) << "Dropping frame from unknown stream " << stream_id;
    if (!valid_spdy_session_)
      close_on_error_ = true;
    return;
  }

  SMInterface* interface = it->second;
  if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY)
    interface->ProcessWriteInput(data, len);
}

void SpdySM::OnSynStream(SpdyStreamId stream_id,
                         SpdyStreamId associated_stream_id,
                         SpdyPriority priority,
                         bool fin,
                         bool unidirectional,
                         const SpdyHeaderBlock& headers) {
  std::string http_data;
  bool is_https_scheme;
  int ret = SpdyHandleNewStream(
      stream_id, priority, headers, http_data, &is_https_scheme);
  if (!ret) {
    LOG(ERROR) << "SpdySM: Could not convert spdy into http.";
    return;
  }
  // We've seen a valid looking SYN_STREAM, consider this to have
  // been a real spdy session.
  valid_spdy_session_ = true;

  if (acceptor_->flip_handler_type_ == FLIP_HANDLER_PROXY) {
    std::string server_ip;
    std::string server_port;
    if (is_https_scheme) {
      server_ip = acceptor_->https_server_ip_;
      server_port = acceptor_->https_server_port_;
    } else {
      server_ip = acceptor_->http_server_ip_;
      server_port = acceptor_->http_server_port_;
    }
    SMInterface* sm_http_interface =
        FindOrMakeNewSMConnectionInterface(server_ip, server_port);
    stream_to_smif_[stream_id] = sm_http_interface;
    sm_http_interface->SetStreamID(stream_id);
    sm_http_interface->ProcessWriteInput(http_data.c_str(), http_data.size());
  }
}

void SpdySM::OnSynReply(SpdyStreamId stream_id,
                        bool fin,
                        const SpdyHeaderBlock& headers) {
  // TODO(willchan): if there is an error parsing headers, we
  // should send a RST_STREAM.
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnSynReply(" << stream_id << ")";
}

void SpdySM::OnHeaders(SpdyStreamId stream_id,
                       bool fin,
                       const SpdyHeaderBlock& headers) {
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnHeaders(" << stream_id << ")";
}

void SpdySM::OnRstStream(SpdyStreamId stream_id, SpdyRstStreamStatus status) {
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: OnRstStream(" << stream_id
          << ")";
  client_output_ordering_.RemoveStreamId(stream_id);
}

size_t SpdySM::ProcessReadInput(const char* data, size_t len) {
  DCHECK(buffered_spdy_framer_);
  return buffered_spdy_framer_->ProcessInput(data, len);
}

size_t SpdySM::ProcessWriteInput(const char* data, size_t len) { return 0; }

bool SpdySM::MessageFullyRead() const {
  DCHECK(buffered_spdy_framer_);
  return buffered_spdy_framer_->MessageFullyRead();
}

bool SpdySM::Error() const {
  DCHECK(buffered_spdy_framer_);
  return close_on_error_ || buffered_spdy_framer_->HasError();
}

const char* SpdySM::ErrorAsString() const {
  DCHECK(Error());
  DCHECK(buffered_spdy_framer_);
  return SpdyFramer::ErrorCodeToString(buffered_spdy_framer_->error_code());
}

void SpdySM::ResetForNewInterface(int32 server_idx) {
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Reset for new interface: "
          << "server_idx: " << server_idx;
  unused_server_interface_list.push_back(server_idx);
}

void SpdySM::ResetForNewConnection() {
  // seq_num is not cleared, intentionally.
  buffered_spdy_framer_.reset();
  valid_spdy_session_ = false;
  client_output_ordering_.Reset();
  next_outgoing_stream_id_ = 2;
}

// Send a settings frame
int SpdySM::PostAcceptHook() {
  // We should have buffered_spdy_framer_ set after reuse
  DCHECK(buffered_spdy_framer_);
  SettingsMap settings;
  settings[SETTINGS_MAX_CONCURRENT_STREAMS] =
      SettingsFlagsAndValue(SETTINGS_FLAG_NONE, 100);
  SpdyFrame* settings_frame = buffered_spdy_framer_->CreateSettings(settings);

  VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending Settings Frame";
  EnqueueDataFrame(new SpdyFrameDataFrame(settings_frame));
  return 1;
}

void SpdySM::NewStream(uint32 stream_id,
                       uint32 priority,
                       const std::string& filename) {
  MemCacheIter mci;
  mci.stream_id = stream_id;
  mci.priority = priority;
  // TODO(yhirano): The program will crash when
  // acceptor_->flip_handler_type_ != FLIP_HANDLER_SPDY_SERVER.
  // It should be fixed or an assertion should be placed.
  if (acceptor_->flip_handler_type_ == FLIP_HANDLER_SPDY_SERVER) {
    if (!memory_cache_->AssignFileData(filename, &mci)) {
      // error creating new stream.
      VLOG(1) << ACCEPTOR_CLIENT_IDENT << "Sending ErrorNotFound";
      SendErrorNotFound(stream_id);
    } else {
      AddToOutputOrder(mci);
    }
  } else {
    AddToOutputOrder(mci);
  }
}

void SpdySM::AddToOutputOrder(const MemCacheIter& mci) {
  client_output_ordering_.AddToOutputOrder(mci);
}

void SpdySM::SendEOF(uint32 stream_id) { SendEOFImpl(stream_id); }

void SpdySM::SendErrorNotFound(uint32 stream_id) {
  SendErrorNotFoundImpl(stream_id);
}

size_t SpdySM::SendSynStream(uint32 stream_id, const BalsaHeaders& headers) {
  return SendSynStreamImpl(stream_id, headers);
}

size_t SpdySM::SendSynReply(uint32 stream_id, const BalsaHeaders& headers) {
  return SendSynReplyImpl(stream_id, headers);
}

void SpdySM::SendDataFrame(uint32 stream_id,
                           const char* data,
                           int64 len,
                           uint32 flags,
                           bool compress) {
  SpdyDataFlags spdy_flags = static_cast<SpdyDataFlags>(flags);
  SendDataFrameImpl(stream_id, data, len, spdy_flags, compress);
}

void SpdySM::SendEOFImpl(uint32 stream_id) {
  SendDataFrame(stream_id, NULL, 0, DATA_FLAG_FIN, false);
  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending EOF: " << stream_id;
  KillStream(stream_id);
  stream_to_smif_.erase(stream_id);
}

void SpdySM::SendErrorNotFoundImpl(uint32 stream_id) {
  BalsaHeaders my_headers;
  my_headers.SetFirstlineFromStringPieces("HTTP/1.1", "404", "Not Found");
  SendSynReplyImpl(stream_id, my_headers);
  SendDataFrame(stream_id, "wtf?", 4, DATA_FLAG_FIN, false);
  client_output_ordering_.RemoveStreamId(stream_id);
}

void SpdySM::KillStream(uint32 stream_id) {
  client_output_ordering_.RemoveStreamId(stream_id);
}

void SpdySM::CopyHeaders(SpdyHeaderBlock& dest, const BalsaHeaders& headers) {
  for (BalsaHeaders::const_header_lines_iterator hi =
           headers.header_lines_begin();
       hi != headers.header_lines_end();
       ++hi) {
    // It is illegal to send SPDY headers with empty value or header
    // names.
    if (!hi->first.length() || !hi->second.length())
      continue;

    // Key must be all lower case in SPDY headers.
    std::string key = hi->first.as_string();
    std::transform(key.begin(), key.end(), key.begin(), ::tolower);
    SpdyHeaderBlock::iterator fhi = dest.find(key);
    if (fhi == dest.end()) {
      dest[key] = hi->second.as_string();
    } else {
      dest[key] = (std::string(fhi->second.data(), fhi->second.size()) + "\0" +
                   std::string(hi->second.data(), hi->second.size()));
    }
  }

  // These headers have no value
  dest.erase("X-Associated-Content");  // TODO(mbelshe): case-sensitive
  dest.erase("X-Original-Url");        // TODO(mbelshe): case-sensitive
}

size_t SpdySM::SendSynStreamImpl(uint32 stream_id,
                                 const BalsaHeaders& headers) {
  SpdyHeaderBlock block;
  CopyHeaders(block, headers);
  if (spdy_version() == SPDY2) {
    block["method"] = headers.request_method().as_string();
    if (!headers.HasHeader("version"))
      block["version"] = headers.request_version().as_string();
    if (headers.HasHeader("X-Original-Url")) {
      std::string original_url =
          headers.GetHeader("X-Original-Url").as_string();
      block["url"] = UrlUtilities::GetUrlPath(original_url);
    } else {
      block["url"] = headers.request_uri().as_string();
    }
  } else {
    block[":method"] = headers.request_method().as_string();
    block[":version"] = headers.request_version().as_string();
    if (headers.HasHeader("X-Original-Url")) {
      std::string original_url =
          headers.GetHeader("X-Original-Url").as_string();
      block[":path"] = UrlUtilities::GetUrlPath(original_url);
      block[":host"] = UrlUtilities::GetUrlPath(original_url);
    } else {
      block[":path"] = headers.request_uri().as_string();
      if (block.find("host") != block.end()) {
        block[":host"] = headers.GetHeader("Host").as_string();
        block.erase("host");
      }
    }
  }

  DCHECK(buffered_spdy_framer_);
  SpdyFrame* fsrcf = buffered_spdy_framer_->CreateSynStream(
      stream_id, 0, 0, CONTROL_FLAG_NONE, &block);
  size_t df_size = fsrcf->size();
  EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf));

  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynStreamheader "
          << stream_id;
  return df_size;
}

size_t SpdySM::SendSynReplyImpl(uint32 stream_id, const BalsaHeaders& headers) {
  SpdyHeaderBlock block;
  CopyHeaders(block, headers);
  if (spdy_version() == SPDY2) {
    block["status"] = headers.response_code().as_string() + " " +
        headers.response_reason_phrase().as_string();
    block["version"] = headers.response_version().as_string();
  } else {
    block[":status"] = headers.response_code().as_string() + " " +
        headers.response_reason_phrase().as_string();
    block[":version"] = headers.response_version().as_string();
  }

  DCHECK(buffered_spdy_framer_);
  SpdyFrame* fsrcf = buffered_spdy_framer_->CreateSynReply(
      stream_id, CONTROL_FLAG_NONE, &block);
  size_t df_size = fsrcf->size();
  EnqueueDataFrame(new SpdyFrameDataFrame(fsrcf));

  VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending SynReplyheader "
          << stream_id;
  return df_size;
}

void SpdySM::SendDataFrameImpl(uint32 stream_id,
                               const char* data,
                               int64 len,
                               SpdyDataFlags flags,
                               bool compress) {
  DCHECK(buffered_spdy_framer_);
  // TODO(mbelshe):  We can't compress here - before going into the
  //                 priority queue.  Compression needs to be done
  //                 with late binding.
  if (len == 0) {
    SpdyFrame* fdf =
        buffered_spdy_framer_->CreateDataFrame(stream_id, data, len, flags);
    EnqueueDataFrame(new SpdyFrameDataFrame(fdf));
    return;
  }

  // Chop data frames into chunks so that one stream can't monopolize the
  // output channel.
  while (len > 0) {
    int64 size = std::min(len, static_cast<int64>(kSpdySegmentSize));
    SpdyDataFlags chunk_flags = flags;

    // If we chunked this block, and the FIN flag was set, there is more
    // data coming.  So, remove the flag.
    if ((size < len) && (flags & DATA_FLAG_FIN))
      chunk_flags = static_cast<SpdyDataFlags>(chunk_flags & ~DATA_FLAG_FIN);

    SpdyFrame* fdf = buffered_spdy_framer_->CreateDataFrame(
        stream_id, data, size, chunk_flags);
    EnqueueDataFrame(new SpdyFrameDataFrame(fdf));

    VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: Sending data frame "
            << stream_id << " [" << size << "] shrunk to "
            << (fdf->size() - kSpdyOverhead) << ", flags=" << flags;

    data += size;
    len -= size;
  }
}

void SpdySM::EnqueueDataFrame(DataFrame* df) {
  connection_->EnqueueDataFrame(df);
}

void SpdySM::GetOutput() {
  while (client_output_list_->size() < 2) {
    MemCacheIter* mci = client_output_ordering_.GetIter();
    if (mci == NULL) {
      VLOG(2) << ACCEPTOR_CLIENT_IDENT
              << "SpdySM: GetOutput: nothing to output!?";
      return;
    }
    if (!mci->transformed_header) {
      mci->transformed_header = true;
      VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput transformed "
              << "header stream_id: [" << mci->stream_id << "]";
      if ((mci->stream_id % 2) == 0) {
        // this is a server initiated stream.
        // Ideally, we'd do a 'syn-push' here, instead of a syn-reply.
        BalsaHeaders headers;
        headers.CopyFrom(*(mci->file_data->headers()));
        headers.ReplaceOrAppendHeader("status", "200");
        headers.ReplaceOrAppendHeader("version", "http/1.1");
        headers.SetRequestFirstlineFromStringPieces(
            "PUSH", mci->file_data->filename(), "");
        mci->bytes_sent = SendSynStream(mci->stream_id, headers);
      } else {
        BalsaHeaders headers;
        headers.CopyFrom(*(mci->file_data->headers()));
        mci->bytes_sent = SendSynReply(mci->stream_id, headers);
      }
      return;
    }
    if (mci->body_bytes_consumed >= mci->file_data->body().size()) {
      VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput "
              << "remove_stream_id: [" << mci->stream_id << "]";
      SendEOF(mci->stream_id);
      return;
    }
    size_t num_to_write =
        mci->file_data->body().size() - mci->body_bytes_consumed;
    if (num_to_write > mci->max_segment_size)
      num_to_write = mci->max_segment_size;

    bool should_compress = false;
    if (!mci->file_data->headers()->HasHeader("content-encoding")) {
      if (mci->file_data->headers()->HasHeader("content-type")) {
        std::string content_type =
            mci->file_data->headers()->GetHeader("content-type").as_string();
        if (content_type.find("image") == content_type.npos)
          should_compress = true;
      }
    }

    SendDataFrame(mci->stream_id,
                  mci->file_data->body().data() + mci->body_bytes_consumed,
                  num_to_write,
                  0,
                  should_compress);
    VLOG(2) << ACCEPTOR_CLIENT_IDENT << "SpdySM: GetOutput SendDataFrame["
            << mci->stream_id << "]: " << num_to_write;
    mci->body_bytes_consumed += num_to_write;
    mci->bytes_sent += num_to_write;
  }
}

void SpdySM::CreateFramer(SpdyMajorVersion spdy_version) {
  DCHECK(!buffered_spdy_framer_);
  buffered_spdy_framer_.reset(new BufferedSpdyFramer(spdy_version, true));
  buffered_spdy_framer_->set_visitor(this);
}

}  // namespace net