// Copyright (c) 2011 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/sync/notifier/cache_invalidation_packet_handler.h"
#include <string>
#include "base/base64.h"
#include "base/callback.h"
#include "base/compiler_specific.h"
#include "base/logging.h"
#include "base/rand_util.h"
#include "base/string_number_conversions.h"
#include "google/cacheinvalidation/invalidation-client.h"
#include "jingle/notifier/listener/xml_element_util.h"
#include "talk/xmpp/constants.h"
#include "talk/xmpp/jid.h"
#include "talk/xmpp/xmppclient.h"
#include "talk/xmpp/xmpptask.h"
namespace sync_notifier {
namespace {
const char kBotJid[] = "tango@bot.talk.google.com";
const char kServiceUrl[] = "http://www.google.com/chrome/sync";
const buzz::QName kQnData("google:notifier", "data");
const buzz::QName kQnSeq("", "seq");
const buzz::QName kQnSid("", "sid");
const buzz::QName kQnServiceUrl("", "serviceUrl");
// TODO(akalin): Move these task classes out so that they can be
// unit-tested. This'll probably be done easier once we consolidate
// all the packet sending/receiving classes.
// A task that listens for ClientInvalidation messages and calls the
// given callback on them.
class CacheInvalidationListenTask : public buzz::XmppTask {
public:
// Takes ownership of callback.
CacheInvalidationListenTask(Task* parent,
Callback1<const std::string&>::Type* callback)
: XmppTask(parent, buzz::XmppEngine::HL_TYPE), callback_(callback) {}
virtual ~CacheInvalidationListenTask() {}
virtual int ProcessStart() {
VLOG(2) << "CacheInvalidationListenTask started";
return STATE_RESPONSE;
}
virtual int ProcessResponse() {
const buzz::XmlElement* stanza = NextStanza();
if (stanza == NULL) {
VLOG(2) << "CacheInvalidationListenTask blocked";
return STATE_BLOCKED;
}
VLOG(2) << "CacheInvalidationListenTask response received";
std::string data;
if (GetCacheInvalidationIqPacketData(stanza, &data)) {
callback_->Run(data);
} else {
LOG(ERROR) << "Could not get packet data";
}
// Acknowledge receipt of the iq to the buzz server.
// TODO(akalin): Send an error response for malformed packets.
scoped_ptr<buzz::XmlElement> response_stanza(MakeIqResult(stanza));
SendStanza(response_stanza.get());
return STATE_RESPONSE;
}
virtual bool HandleStanza(const buzz::XmlElement* stanza) {
VLOG(1) << "Stanza received: "
<< notifier::XmlElementToString(*stanza);
if (IsValidCacheInvalidationIqPacket(stanza)) {
VLOG(2) << "Queueing stanza";
QueueStanza(stanza);
return true;
}
VLOG(2) << "Stanza skipped";
return false;
}
private:
bool IsValidCacheInvalidationIqPacket(const buzz::XmlElement* stanza) {
// We deliberately minimize the verification we do here: see
// http://crbug.com/71285 .
return MatchRequestIq(stanza, buzz::STR_SET, kQnData);
}
bool GetCacheInvalidationIqPacketData(const buzz::XmlElement* stanza,
std::string* data) {
DCHECK(IsValidCacheInvalidationIqPacket(stanza));
const buzz::XmlElement* cache_invalidation_iq_packet =
stanza->FirstNamed(kQnData);
if (!cache_invalidation_iq_packet) {
LOG(ERROR) << "Could not find cache invalidation IQ packet element";
return false;
}
*data = cache_invalidation_iq_packet->BodyText();
return true;
}
scoped_ptr<Callback1<const std::string&>::Type> callback_;
DISALLOW_COPY_AND_ASSIGN(CacheInvalidationListenTask);
};
// A task that sends a single outbound ClientInvalidation message.
class CacheInvalidationSendMessageTask : public buzz::XmppTask {
public:
CacheInvalidationSendMessageTask(Task* parent,
const buzz::Jid& to_jid,
const std::string& msg,
int seq,
const std::string& sid)
: XmppTask(parent, buzz::XmppEngine::HL_SINGLE),
to_jid_(to_jid), msg_(msg), seq_(seq), sid_(sid) {}
virtual ~CacheInvalidationSendMessageTask() {}
virtual int ProcessStart() {
scoped_ptr<buzz::XmlElement> stanza(
MakeCacheInvalidationIqPacket(to_jid_, task_id(), msg_,
seq_, sid_));
VLOG(1) << "Sending message: "
<< notifier::XmlElementToString(*stanza.get());
if (SendStanza(stanza.get()) != buzz::XMPP_RETURN_OK) {
VLOG(2) << "Error when sending message";
return STATE_ERROR;
}
return STATE_RESPONSE;
}
virtual int ProcessResponse() {
const buzz::XmlElement* stanza = NextStanza();
if (stanza == NULL) {
VLOG(2) << "CacheInvalidationSendMessageTask blocked...";
return STATE_BLOCKED;
}
VLOG(2) << "CacheInvalidationSendMessageTask response received: "
<< notifier::XmlElementToString(*stanza);
// TODO(akalin): Handle errors here.
return STATE_DONE;
}
virtual bool HandleStanza(const buzz::XmlElement* stanza) {
VLOG(1) << "Stanza received: "
<< notifier::XmlElementToString(*stanza);
if (!MatchResponseIq(stanza, to_jid_, task_id())) {
VLOG(2) << "Stanza skipped";
return false;
}
VLOG(2) << "Queueing stanza";
QueueStanza(stanza);
return true;
}
private:
static buzz::XmlElement* MakeCacheInvalidationIqPacket(
const buzz::Jid& to_jid,
const std::string& task_id,
const std::string& msg,
int seq, const std::string& sid) {
buzz::XmlElement* iq = MakeIq(buzz::STR_SET, to_jid, task_id);
buzz::XmlElement* cache_invalidation_iq_packet =
new buzz::XmlElement(kQnData, true);
iq->AddElement(cache_invalidation_iq_packet);
cache_invalidation_iq_packet->SetAttr(kQnSeq, base::IntToString(seq));
cache_invalidation_iq_packet->SetAttr(kQnSid, sid);
cache_invalidation_iq_packet->SetAttr(kQnServiceUrl, kServiceUrl);
cache_invalidation_iq_packet->SetBodyText(msg);
return iq;
}
const buzz::Jid to_jid_;
std::string msg_;
int seq_;
std::string sid_;
DISALLOW_COPY_AND_ASSIGN(CacheInvalidationSendMessageTask);
};
std::string MakeSid() {
uint64 sid = base::RandUint64();
return std::string("chrome-sync-") + base::Uint64ToString(sid);
}
} // namespace
CacheInvalidationPacketHandler::CacheInvalidationPacketHandler(
base::WeakPtr<talk_base::Task> base_task,
invalidation::InvalidationClient* invalidation_client)
: scoped_callback_factory_(ALLOW_THIS_IN_INITIALIZER_LIST(this)),
base_task_(base_task),
invalidation_client_(invalidation_client),
seq_(0),
sid_(MakeSid()) {
CHECK(base_task_.get());
// Owned by base_task. Takes ownership of the callback.
CacheInvalidationListenTask* listen_task =
new CacheInvalidationListenTask(
base_task_, scoped_callback_factory_.NewCallback(
&CacheInvalidationPacketHandler::HandleInboundPacket));
listen_task->Start();
}
CacheInvalidationPacketHandler::~CacheInvalidationPacketHandler() {
DCHECK(non_thread_safe_.CalledOnValidThread());
}
void CacheInvalidationPacketHandler::HandleOutboundPacket(
invalidation::NetworkEndpoint* network_endpoint) {
DCHECK(non_thread_safe_.CalledOnValidThread());
if (!base_task_.get()) {
return;
}
CHECK_EQ(network_endpoint, invalidation_client_->network_endpoint());
invalidation::string message;
network_endpoint->TakeOutboundMessage(&message);
std::string encoded_message;
if (!base::Base64Encode(message, &encoded_message)) {
LOG(ERROR) << "Could not base64-encode message to send: "
<< message;
return;
}
// Owned by base_task_.
CacheInvalidationSendMessageTask* send_message_task =
new CacheInvalidationSendMessageTask(base_task_,
buzz::Jid(kBotJid),
encoded_message,
seq_, sid_);
send_message_task->Start();
++seq_;
}
void CacheInvalidationPacketHandler::HandleInboundPacket(
const std::string& packet) {
DCHECK(non_thread_safe_.CalledOnValidThread());
invalidation::NetworkEndpoint* network_endpoint =
invalidation_client_->network_endpoint();
std::string decoded_message;
if (!base::Base64Decode(packet, &decoded_message)) {
LOG(ERROR) << "Could not base64-decode received message: "
<< packet;
return;
}
network_endpoint->HandleInboundMessage(decoded_message);
}
} // namespace sync_notifier