// Copyright 2015 The Weave Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. #include "src/device_registration_info.h" #include <algorithm> #include <memory> #include <set> #include <utility> #include <vector> #include <base/bind.h> #include <base/json/json_reader.h> #include <base/json/json_writer.h> #include <base/strings/string_number_conversions.h> #include <base/strings/stringprintf.h> #include <base/values.h> #include <weave/provider/http_client.h> #include <weave/provider/network.h> #include <weave/provider/task_runner.h> #include "src/bind_lambda.h" #include "src/commands/cloud_command_proxy.h" #include "src/commands/schema_constants.h" #include "src/data_encoding.h" #include "src/http_constants.h" #include "src/json_error_codes.h" #include "src/notification/xmpp_channel.h" #include "src/privet/auth_manager.h" #include "src/string_utils.h" #include "src/utils.h" namespace weave { const char kErrorAlreayRegistered[] = "already_registered"; namespace { const int kPollingPeriodSeconds = 7; const int kBackupPollingPeriodMinutes = 30; namespace fetch_reason { const char kDeviceStart[] = "device_start"; // Initial queue fetch at startup. const char kRegularPull[] = "regular_pull"; // Regular fetch before XMPP is up. const char kNewCommand[] = "new_command"; // A new command is available. const char kJustInCase[] = "just_in_case"; // Backup fetch when XMPP is live. } // namespace fetch_reason using provider::HttpClient; inline void SetUnexpectedError(ErrorPtr* error) { Error::AddTo(error, FROM_HERE, "unexpected_response", "Unexpected GCD error"); } void ParseGCDError(const base::DictionaryValue* json, ErrorPtr* error) { const base::Value* list_value = nullptr; const base::ListValue* error_list = nullptr; if (!json->Get("error.errors", &list_value) || !list_value->GetAsList(&error_list)) { SetUnexpectedError(error); return; } for (size_t i = 0; i < error_list->GetSize(); i++) { const base::Value* error_value = nullptr; const base::DictionaryValue* error_object = nullptr; if (!error_list->Get(i, &error_value) || !error_value->GetAsDictionary(&error_object)) { SetUnexpectedError(error); continue; } std::string error_code, error_message; if (error_object->GetString("reason", &error_code) && error_object->GetString("message", &error_message)) { Error::AddTo(error, FROM_HERE, error_code, error_message); } else { SetUnexpectedError(error); } } } std::string AppendQueryParams(const std::string& url, const WebParamList& params) { CHECK_EQ(std::string::npos, url.find_first_of("?#")); if (params.empty()) return url; return url + '?' + WebParamsEncode(params); } std::string BuildURL(const std::string& url, const std::string& subpath, const WebParamList& params) { std::string result = url; if (!result.empty() && result.back() != '/' && !subpath.empty()) { CHECK_NE('/', subpath.front()); result += '/'; } result += subpath; return AppendQueryParams(result, params); } void IgnoreCloudErrorWithCallback(const base::Closure& cb, ErrorPtr) { cb.Run(); } void IgnoreCloudError(ErrorPtr) {} void IgnoreCloudResult(const base::DictionaryValue&, ErrorPtr error) {} void IgnoreCloudResultWithCallback(const DoneCallback& cb, const base::DictionaryValue&, ErrorPtr error) { cb.Run(std::move(error)); } class RequestSender final { public: RequestSender(HttpClient::Method method, const std::string& url, HttpClient* transport) : method_{method}, url_{url}, transport_{transport} {} void Send(const HttpClient::SendRequestCallback& callback) { static int debug_id = 0; ++debug_id; VLOG(1) << "Sending request. id:" << debug_id << " method:" << EnumToString(method_) << " url:" << url_; VLOG(2) << "Request data: " << data_; auto on_done = []( int debug_id, const HttpClient::SendRequestCallback& callback, std::unique_ptr<HttpClient::Response> response, ErrorPtr error) { if (error) { VLOG(1) << "Request failed, id=" << debug_id << ", reason: " << error->GetCode() << ", message: " << error->GetMessage(); return callback.Run({}, std::move(error)); } VLOG(1) << "Request succeeded. id:" << debug_id << " status:" << response->GetStatusCode(); VLOG(2) << "Response data: " << response->GetData(); callback.Run(std::move(response), nullptr); }; transport_->SendRequest(method_, url_, GetFullHeaders(), data_, base::Bind(on_done, debug_id, callback)); } void SetAccessToken(const std::string& access_token) { access_token_ = access_token; } void SetData(const std::string& data, const std::string& mime_type) { data_ = data; mime_type_ = mime_type; } void SetFormData( const std::vector<std::pair<std::string, std::string>>& data) { SetData(WebParamsEncode(data), http::kWwwFormUrlEncoded); } void SetJsonData(const base::Value& json) { std::string data; CHECK(base::JSONWriter::Write(json, &data)); SetData(data, http::kJsonUtf8); } private: HttpClient::Headers GetFullHeaders() const { HttpClient::Headers headers; if (!access_token_.empty()) headers.emplace_back(http::kAuthorization, "Bearer " + access_token_); if (!mime_type_.empty()) headers.emplace_back(http::kContentType, mime_type_); return headers; } HttpClient::Method method_; std::string url_; std::string data_; std::string mime_type_; std::string access_token_; HttpClient* transport_{nullptr}; DISALLOW_COPY_AND_ASSIGN(RequestSender); }; std::unique_ptr<base::DictionaryValue> ParseJsonResponse( const HttpClient::Response& response, ErrorPtr* error) { // Make sure we have a correct content type. Do not try to parse // binary files, or HTML output. Limit to application/json and text/plain. std::string content_type = SplitAtFirst(response.GetContentType(), ";", true).first; if (content_type != http::kJson && content_type != http::kPlain) { return Error::AddTo( error, FROM_HERE, "non_json_content_type", "Unexpected content type: \'" + response.GetContentType() + "\'"); } const std::string& json = response.GetData(); std::string error_message; auto value = base::JSONReader::ReadAndReturnError(json, base::JSON_PARSE_RFC, nullptr, &error_message); if (!value) { Error::AddToPrintf(error, FROM_HERE, errors::json::kParseError, "Error '%s' occurred parsing JSON string '%s'", error_message.c_str(), json.c_str()); return std::unique_ptr<base::DictionaryValue>(); } base::DictionaryValue* dict_value = nullptr; if (!value->GetAsDictionary(&dict_value)) { Error::AddToPrintf(error, FROM_HERE, errors::json::kObjectExpected, "Response is not a valid JSON object: '%s'", json.c_str()); return std::unique_ptr<base::DictionaryValue>(); } else { // |value| is now owned by |dict_value|, so release the scoped_ptr now. base::IgnoreResult(value.release()); } return std::unique_ptr<base::DictionaryValue>(dict_value); } bool IsSuccessful(const HttpClient::Response& response) { int code = response.GetStatusCode(); return code >= http::kContinue && code < http::kBadRequest; } } // anonymous namespace DeviceRegistrationInfo::DeviceRegistrationInfo( Config* config, ComponentManager* component_manager, provider::TaskRunner* task_runner, provider::HttpClient* http_client, provider::Network* network, privet::AuthManager* auth_manager) : http_client_{http_client}, task_runner_{task_runner}, config_{config}, component_manager_{component_manager}, network_{network}, auth_manager_{auth_manager} { cloud_backoff_policy_.reset(new BackoffEntry::Policy{}); cloud_backoff_policy_->num_errors_to_ignore = 0; cloud_backoff_policy_->initial_delay_ms = 1000; cloud_backoff_policy_->multiply_factor = 2.0; cloud_backoff_policy_->jitter_factor = 0.1; cloud_backoff_policy_->maximum_backoff_ms = 30000; cloud_backoff_policy_->entry_lifetime_ms = -1; cloud_backoff_policy_->always_use_initial_delay = false; cloud_backoff_entry_.reset(new BackoffEntry{cloud_backoff_policy_.get()}); oauth2_backoff_entry_.reset(new BackoffEntry{cloud_backoff_policy_.get()}); bool revoked = !GetSettings().cloud_id.empty() && !HaveRegistrationCredentials(); gcd_state_ = revoked ? GcdState::kInvalidCredentials : GcdState::kUnconfigured; component_manager_->AddTraitDefChangedCallback(base::Bind( &DeviceRegistrationInfo::OnTraitDefsChanged, weak_factory_.GetWeakPtr())); component_manager_->AddComponentTreeChangedCallback( base::Bind(&DeviceRegistrationInfo::OnComponentTreeChanged, weak_factory_.GetWeakPtr())); component_manager_->AddStateChangedCallback(base::Bind( &DeviceRegistrationInfo::OnStateChanged, weak_factory_.GetWeakPtr())); } DeviceRegistrationInfo::~DeviceRegistrationInfo() = default; std::string DeviceRegistrationInfo::GetServiceURL( const std::string& subpath, const WebParamList& params) const { return BuildURL(GetSettings().service_url, subpath, params); } std::string DeviceRegistrationInfo::GetDeviceURL( const std::string& subpath, const WebParamList& params) const { CHECK(!GetSettings().cloud_id.empty()) << "Must have a valid device ID"; return BuildURL(GetSettings().service_url, "devices/" + GetSettings().cloud_id + "/" + subpath, params); } std::string DeviceRegistrationInfo::GetOAuthURL( const std::string& subpath, const WebParamList& params) const { return BuildURL(GetSettings().oauth_url, subpath, params); } void DeviceRegistrationInfo::Start() { if (HaveRegistrationCredentials()) { StartNotificationChannel(); // Wait a significant amount of time for local daemons to publish their // state to Buffet before publishing it to the cloud. // TODO(wiley) We could do a lot of things here to either expose this // timeout as a configurable knob or allow local // daemons to signal that their state is up to date so that // we need not wait for them. ScheduleCloudConnection(base::TimeDelta::FromSeconds(5)); } } void DeviceRegistrationInfo::ScheduleCloudConnection( const base::TimeDelta& delay) { SetGcdState(GcdState::kConnecting); if (!task_runner_) return; // Assume we're in test task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&DeviceRegistrationInfo::ConnectToCloud, AsWeakPtr(), nullptr), delay); } bool DeviceRegistrationInfo::HaveRegistrationCredentials() const { return !GetSettings().refresh_token.empty() && !GetSettings().cloud_id.empty() && !GetSettings().robot_account.empty(); } bool DeviceRegistrationInfo::VerifyRegistrationCredentials( ErrorPtr* error) const { const bool have_credentials = HaveRegistrationCredentials(); VLOG(2) << "Device registration record " << ((have_credentials) ? "found" : "not found."); if (!have_credentials) { return Error::AddTo(error, FROM_HERE, "device_not_registered", "No valid device registration record found"); } return true; } std::unique_ptr<base::DictionaryValue> DeviceRegistrationInfo::ParseOAuthResponse(const HttpClient::Response& response, ErrorPtr* error) { int code = response.GetStatusCode(); auto resp = ParseJsonResponse(response, error); if (resp && code >= http::kBadRequest) { std::string error_code, error_message; if (!resp->GetString("error", &error_code)) { error_code = "unexpected_response"; } if (error_code == "invalid_grant") { LOG(INFO) << "The device's registration has been revoked."; SetGcdState(GcdState::kInvalidCredentials); } // I have never actually seen an error_description returned. if (!resp->GetString("error_description", &error_message)) { error_message = "Unexpected OAuth error"; } return Error::AddTo(error, FROM_HERE, error_code, error_message); } return resp; } void DeviceRegistrationInfo::RefreshAccessToken(const DoneCallback& callback) { LOG(INFO) << "Refreshing access token."; ErrorPtr error; if (!VerifyRegistrationCredentials(&error)) return callback.Run(std::move(error)); if (oauth2_backoff_entry_->ShouldRejectRequest()) { VLOG(1) << "RefreshToken request delayed for " << oauth2_backoff_entry_->GetTimeUntilRelease() << " due to backoff policy"; task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&DeviceRegistrationInfo::RefreshAccessToken, AsWeakPtr(), callback), oauth2_backoff_entry_->GetTimeUntilRelease()); return; } RequestSender sender{HttpClient::Method::kPost, GetOAuthURL("token"), http_client_}; sender.SetFormData({ {"refresh_token", GetSettings().refresh_token}, {"client_id", GetSettings().client_id}, {"client_secret", GetSettings().client_secret}, {"grant_type", "refresh_token"}, }); sender.Send(base::Bind(&DeviceRegistrationInfo::OnRefreshAccessTokenDone, weak_factory_.GetWeakPtr(), callback)); VLOG(1) << "Refresh access token request dispatched"; } void DeviceRegistrationInfo::OnRefreshAccessTokenDone( const DoneCallback& callback, std::unique_ptr<HttpClient::Response> response, ErrorPtr error) { if (error) { VLOG(1) << "Refresh access token failed"; oauth2_backoff_entry_->InformOfRequest(false); return RefreshAccessToken(callback); } VLOG(1) << "Refresh access token request completed"; oauth2_backoff_entry_->InformOfRequest(true); auto json = ParseOAuthResponse(*response, &error); if (!json) return callback.Run(std::move(error)); int expires_in = 0; if (!json->GetString("access_token", &access_token_) || !json->GetInteger("expires_in", &expires_in) || access_token_.empty() || expires_in <= 0) { LOG(ERROR) << "Access token unavailable."; Error::AddTo(&error, FROM_HERE, "unexpected_server_response", "Access token unavailable"); return callback.Run(std::move(error)); } access_token_expiration_ = base::Time::Now() + base::TimeDelta::FromSeconds(expires_in); LOG(INFO) << "Access token is refreshed for additional " << expires_in << " seconds."; if (primary_notification_channel_ && !primary_notification_channel_->IsConnected()) { // If we have disconnected channel, it is due to failed credentials. // Now that we have a new access token, retry the connection. StartNotificationChannel(); } SendAuthInfo(); callback.Run(nullptr); } void DeviceRegistrationInfo::StartNotificationChannel() { if (notification_channel_starting_) return; LOG(INFO) << "Starting notification channel"; // If no TaskRunner assume we're in test. if (!network_) { LOG(INFO) << "No Network, not starting notification channel"; return; } if (primary_notification_channel_) { primary_notification_channel_->Stop(); primary_notification_channel_.reset(); current_notification_channel_ = nullptr; } // Start with just regular polling at the pre-configured polling interval. // Once the primary notification channel is connected successfully, it will // call back to OnConnected() and at that time we'll switch to use the // primary channel and switch periodic poll into much more infrequent backup // poll mode. const base::TimeDelta pull_interval = base::TimeDelta::FromSeconds(kPollingPeriodSeconds); if (!pull_channel_) { pull_channel_.reset(new PullChannel{pull_interval, task_runner_}); pull_channel_->Start(this); } else { pull_channel_->UpdatePullInterval(pull_interval); } current_notification_channel_ = pull_channel_.get(); notification_channel_starting_ = true; primary_notification_channel_.reset( new XmppChannel{GetSettings().robot_account, access_token_, GetSettings().xmpp_endpoint, task_runner_, network_}); primary_notification_channel_->Start(this); } void DeviceRegistrationInfo::AddGcdStateChangedCallback( const Device::GcdStateChangedCallback& callback) { gcd_state_changed_callbacks_.push_back(callback); callback.Run(gcd_state_); } std::unique_ptr<base::DictionaryValue> DeviceRegistrationInfo::BuildDeviceResource() const { std::unique_ptr<base::DictionaryValue> resource{new base::DictionaryValue}; if (!GetSettings().cloud_id.empty()) resource->SetString("id", GetSettings().cloud_id); resource->SetString("name", GetSettings().name); if (!GetSettings().description.empty()) resource->SetString("description", GetSettings().description); if (!GetSettings().location.empty()) resource->SetString("location", GetSettings().location); resource->SetString("modelManifestId", GetSettings().model_id); std::unique_ptr<base::DictionaryValue> channel{new base::DictionaryValue}; if (current_notification_channel_) { channel->SetString("supportedType", current_notification_channel_->GetName()); current_notification_channel_->AddChannelParameters(channel.get()); } else { channel->SetString("supportedType", "pull"); } resource->Set("channel", channel.release()); resource->Set("traits", component_manager_->GetTraits().DeepCopy()); resource->Set("components", component_manager_->GetComponents().DeepCopy()); return resource; } void DeviceRegistrationInfo::GetDeviceInfo( const CloudRequestDoneCallback& callback) { ErrorPtr error; if (!VerifyRegistrationCredentials(&error)) return callback.Run({}, std::move(error)); DoCloudRequest(HttpClient::Method::kGet, GetDeviceURL(), nullptr, callback); } void DeviceRegistrationInfo::RegisterDeviceError(const DoneCallback& callback, ErrorPtr error) { task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, base::Passed(&error)), {}); } void DeviceRegistrationInfo::RegisterDevice(const std::string& ticket_id, const DoneCallback& callback) { if (HaveRegistrationCredentials()) { ErrorPtr error; Error::AddTo(&error, FROM_HERE, kErrorAlreayRegistered, "Unable to register already registered device"); return RegisterDeviceError(callback, std::move(error)); } std::unique_ptr<base::DictionaryValue> device_draft = BuildDeviceResource(); CHECK(device_draft); base::DictionaryValue req_json; req_json.SetString("id", ticket_id); req_json.SetString("oauthClientId", GetSettings().client_id); req_json.Set("deviceDraft", device_draft.release()); auto url = GetServiceURL("registrationTickets/" + ticket_id, {{"key", GetSettings().api_key}}); RequestSender sender{HttpClient::Method::kPatch, url, http_client_}; sender.SetJsonData(req_json); sender.Send(base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnTicketSent, weak_factory_.GetWeakPtr(), ticket_id, callback)); } void DeviceRegistrationInfo::RegisterDeviceOnTicketSent( const std::string& ticket_id, const DoneCallback& callback, std::unique_ptr<provider::HttpClient::Response> response, ErrorPtr error) { if (error) return RegisterDeviceError(callback, std::move(error)); auto json_resp = ParseJsonResponse(*response, &error); if (!json_resp) return RegisterDeviceError(callback, std::move(error)); if (!IsSuccessful(*response)) { ParseGCDError(json_resp.get(), &error); return RegisterDeviceError(callback, std::move(error)); } std::string url = GetServiceURL("registrationTickets/" + ticket_id + "/finalize", {{"key", GetSettings().api_key}}); RequestSender{HttpClient::Method::kPost, url, http_client_}.Send( base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnTicketFinalized, weak_factory_.GetWeakPtr(), callback)); } void DeviceRegistrationInfo::RegisterDeviceOnTicketFinalized( const DoneCallback& callback, std::unique_ptr<provider::HttpClient::Response> response, ErrorPtr error) { if (error) return RegisterDeviceError(callback, std::move(error)); auto json_resp = ParseJsonResponse(*response, &error); if (!json_resp) return RegisterDeviceError(callback, std::move(error)); if (!IsSuccessful(*response)) { ParseGCDError(json_resp.get(), &error); return RegisterDeviceError(callback, std::move(error)); } std::string auth_code; std::string cloud_id; std::string robot_account; const base::DictionaryValue* device_draft_response = nullptr; if (!json_resp->GetString("robotAccountEmail", &robot_account) || !json_resp->GetString("robotAccountAuthorizationCode", &auth_code) || !json_resp->GetDictionary("deviceDraft", &device_draft_response) || !device_draft_response->GetString("id", &cloud_id)) { Error::AddTo(&error, FROM_HERE, "unexpected_response", "Device account missing in response"); return RegisterDeviceError(callback, std::move(error)); } UpdateDeviceInfoTimestamp(*device_draft_response); // Now get access_token and refresh_token RequestSender sender2{HttpClient::Method::kPost, GetOAuthURL("token"), http_client_}; sender2.SetFormData({{"code", auth_code}, {"client_id", GetSettings().client_id}, {"client_secret", GetSettings().client_secret}, {"redirect_uri", "oob"}, {"grant_type", "authorization_code"}}); sender2.Send(base::Bind(&DeviceRegistrationInfo::RegisterDeviceOnAuthCodeSent, weak_factory_.GetWeakPtr(), cloud_id, robot_account, callback)); } void DeviceRegistrationInfo::RegisterDeviceOnAuthCodeSent( const std::string& cloud_id, const std::string& robot_account, const DoneCallback& callback, std::unique_ptr<provider::HttpClient::Response> response, ErrorPtr error) { if (error) return RegisterDeviceError(callback, std::move(error)); auto json_resp = ParseOAuthResponse(*response, &error); int expires_in = 0; std::string refresh_token; if (!json_resp || !json_resp->GetString("access_token", &access_token_) || !json_resp->GetString("refresh_token", &refresh_token) || !json_resp->GetInteger("expires_in", &expires_in) || access_token_.empty() || refresh_token.empty() || expires_in <= 0) { Error::AddTo(&error, FROM_HERE, "unexpected_response", "Device access_token missing in response"); return RegisterDeviceError(callback, std::move(error)); } access_token_expiration_ = base::Time::Now() + base::TimeDelta::FromSeconds(expires_in); Config::Transaction change{config_}; change.set_cloud_id(cloud_id); change.set_robot_account(robot_account); change.set_refresh_token(refresh_token); change.Commit(); task_runner_->PostDelayedTask(FROM_HERE, base::Bind(callback, nullptr), {}); StartNotificationChannel(); SendAuthInfo(); // We're going to respond with our success immediately and we'll connect to // cloud shortly after. ScheduleCloudConnection({}); } void DeviceRegistrationInfo::DoCloudRequest( HttpClient::Method method, const std::string& url, const base::DictionaryValue* body, const CloudRequestDoneCallback& callback) { // We make CloudRequestData shared here because we want to make sure // there is only one instance of callback and error_calback since // those may have move-only types and making a copy of the callback with // move-only types curried-in will invalidate the source callback. auto data = std::make_shared<CloudRequestData>(); data->method = method; data->url = url; if (body) base::JSONWriter::Write(*body, &data->body); data->callback = callback; SendCloudRequest(data); } void DeviceRegistrationInfo::SendCloudRequest( const std::shared_ptr<const CloudRequestData>& data) { // TODO(antonm): Add reauthorization on access token expiration (do not // forget about 5xx when fetching new access token). // TODO(antonm): Add support for device removal. ErrorPtr error; if (!VerifyRegistrationCredentials(&error)) return data->callback.Run({}, std::move(error)); if (cloud_backoff_entry_->ShouldRejectRequest()) { VLOG(1) << "Cloud request delayed for " << cloud_backoff_entry_->GetTimeUntilRelease() << " due to backoff policy"; return task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&DeviceRegistrationInfo::SendCloudRequest, AsWeakPtr(), data), cloud_backoff_entry_->GetTimeUntilRelease()); } RequestSender sender{data->method, data->url, http_client_}; sender.SetData(data->body, http::kJsonUtf8); sender.SetAccessToken(access_token_); sender.Send(base::Bind(&DeviceRegistrationInfo::OnCloudRequestDone, AsWeakPtr(), data)); } void DeviceRegistrationInfo::OnCloudRequestDone( const std::shared_ptr<const CloudRequestData>& data, std::unique_ptr<provider::HttpClient::Response> response, ErrorPtr error) { if (error) return RetryCloudRequest(data); int status_code = response->GetStatusCode(); if (status_code == http::kDenied) { cloud_backoff_entry_->InformOfRequest(true); RefreshAccessToken(base::Bind( &DeviceRegistrationInfo::OnAccessTokenRefreshed, AsWeakPtr(), data)); return; } if (status_code >= http::kInternalServerError) { // Request was valid, but server failed, retry. // TODO(antonm): Reconsider status codes, maybe only some require // retry. // TODO(antonm): Support Retry-After header. RetryCloudRequest(data); return; } if (response->GetContentType().empty()) { // Assume no body if no content type. cloud_backoff_entry_->InformOfRequest(true); return data->callback.Run({}, nullptr); } auto json_resp = ParseJsonResponse(*response, &error); if (!json_resp) { cloud_backoff_entry_->InformOfRequest(false); return data->callback.Run({}, std::move(error)); } if (!IsSuccessful(*response)) { ParseGCDError(json_resp.get(), &error); if (status_code == http::kForbidden && error->HasError("rateLimitExceeded")) { // If we exceeded server quota, retry the request later. return RetryCloudRequest(data); } cloud_backoff_entry_->InformOfRequest(false); return data->callback.Run({}, std::move(error)); } cloud_backoff_entry_->InformOfRequest(true); SetGcdState(GcdState::kConnected); data->callback.Run(*json_resp, nullptr); } void DeviceRegistrationInfo::RetryCloudRequest( const std::shared_ptr<const CloudRequestData>& data) { // TODO(avakulenko): Tie connecting/connected status to XMPP channel instead. SetGcdState(GcdState::kConnecting); cloud_backoff_entry_->InformOfRequest(false); SendCloudRequest(data); } void DeviceRegistrationInfo::OnAccessTokenRefreshed( const std::shared_ptr<const CloudRequestData>& data, ErrorPtr error) { if (error) { CheckAccessTokenError(error->Clone()); return data->callback.Run({}, std::move(error)); } SendCloudRequest(data); } void DeviceRegistrationInfo::CheckAccessTokenError(ErrorPtr error) { if (error && error->HasError("invalid_grant")) RemoveCredentials(); } void DeviceRegistrationInfo::ConnectToCloud(ErrorPtr error) { if (error) { if (error->HasError("invalid_grant")) RemoveCredentials(); return; } connected_to_cloud_ = false; if (!VerifyRegistrationCredentials(nullptr)) return; if (access_token_.empty()) { RefreshAccessToken( base::Bind(&DeviceRegistrationInfo::ConnectToCloud, AsWeakPtr())); return; } // Connecting a device to cloud just means that we: // 1) push an updated device resource // 2) fetch an initial set of outstanding commands // 3) abort any commands that we've previously marked as "in progress" // or as being in an error state; publish queued commands UpdateDeviceResource( base::Bind(&DeviceRegistrationInfo::OnConnectedToCloud, AsWeakPtr())); } void DeviceRegistrationInfo::OnConnectedToCloud(ErrorPtr error) { if (error) return; LOG(INFO) << "Device connected to cloud server"; connected_to_cloud_ = true; FetchCommands(base::Bind(&DeviceRegistrationInfo::ProcessInitialCommandList, AsWeakPtr()), fetch_reason::kDeviceStart); // In case there are any pending state updates since we sent off the initial // UpdateDeviceResource() request, update the server with any state changes. PublishStateUpdates(); } void DeviceRegistrationInfo::UpdateDeviceInfo(const std::string& name, const std::string& description, const std::string& location) { Config::Transaction change{config_}; change.set_name(name); change.set_description(description); change.set_location(location); change.Commit(); if (HaveRegistrationCredentials()) { UpdateDeviceResource(base::Bind(&IgnoreCloudError)); } } void DeviceRegistrationInfo::UpdateBaseConfig(AuthScope anonymous_access_role, bool local_discovery_enabled, bool local_pairing_enabled) { Config::Transaction change(config_); change.set_local_anonymous_access_role(anonymous_access_role); change.set_local_discovery_enabled(local_discovery_enabled); change.set_local_pairing_enabled(local_pairing_enabled); } bool DeviceRegistrationInfo::UpdateServiceConfig( const std::string& client_id, const std::string& client_secret, const std::string& api_key, const std::string& oauth_url, const std::string& service_url, const std::string& xmpp_endpoint, ErrorPtr* error) { if (HaveRegistrationCredentials()) { return Error::AddTo(error, FROM_HERE, kErrorAlreayRegistered, "Unable to change config for registered device"); } Config::Transaction change{config_}; if (!client_id.empty()) change.set_client_id(client_id); if (!client_secret.empty()) change.set_client_secret(client_secret); if (!api_key.empty()) change.set_api_key(api_key); if (!oauth_url.empty()) change.set_oauth_url(oauth_url); if (!service_url.empty()) change.set_service_url(service_url); if (!xmpp_endpoint.empty()) change.set_xmpp_endpoint(xmpp_endpoint); return true; } void DeviceRegistrationInfo::UpdateCommand( const std::string& command_id, const base::DictionaryValue& command_patch, const DoneCallback& callback) { DoCloudRequest(HttpClient::Method::kPatch, GetServiceURL("commands/" + command_id), &command_patch, base::Bind(&IgnoreCloudResultWithCallback, callback)); } void DeviceRegistrationInfo::NotifyCommandAborted(const std::string& command_id, ErrorPtr error) { base::DictionaryValue command_patch; command_patch.SetString(commands::attributes::kCommand_State, EnumToString(Command::State::kAborted)); if (error) { command_patch.Set(commands::attributes::kCommand_Error, ErrorInfoToJson(*error).release()); } UpdateCommand(command_id, command_patch, base::Bind(&IgnoreCloudError)); } void DeviceRegistrationInfo::UpdateDeviceResource( const DoneCallback& callback) { queued_resource_update_callbacks_.emplace_back(callback); if (!in_progress_resource_update_callbacks_.empty()) { VLOG(1) << "Another request is already pending."; return; } StartQueuedUpdateDeviceResource(); } void DeviceRegistrationInfo::StartQueuedUpdateDeviceResource() { if (in_progress_resource_update_callbacks_.empty() && queued_resource_update_callbacks_.empty()) return; if (last_device_resource_updated_timestamp_.empty()) { // We don't know the current time stamp of the device resource from the // server side. We need to provide the time stamp to the server as part of // the request to guard against out-of-order requests overwriting settings // specified by later requests. VLOG(1) << "Getting the last device resource timestamp from server..."; GetDeviceInfo(base::Bind(&DeviceRegistrationInfo::OnDeviceInfoRetrieved, AsWeakPtr())); return; } in_progress_resource_update_callbacks_.insert( in_progress_resource_update_callbacks_.end(), queued_resource_update_callbacks_.begin(), queued_resource_update_callbacks_.end()); queued_resource_update_callbacks_.clear(); VLOG(1) << "Updating GCD server with CDD..."; std::unique_ptr<base::DictionaryValue> device_resource = BuildDeviceResource(); CHECK(device_resource); std::string url = GetDeviceURL( {}, {{"lastUpdateTimeMs", last_device_resource_updated_timestamp_}}); DoCloudRequest(HttpClient::Method::kPut, url, device_resource.get(), base::Bind(&DeviceRegistrationInfo::OnUpdateDeviceResourceDone, AsWeakPtr())); } void DeviceRegistrationInfo::SendAuthInfo() { if (!auth_manager_ || auth_info_update_inprogress_) return; if (GetSettings().root_client_token_owner == RootClientTokenOwner::kCloud) { // Avoid re-claiming if device is already claimed by the Cloud. Cloud is // allowed to re-claim device at any time. However this will invalidate all // issued tokens. return; } auth_info_update_inprogress_ = true; std::vector<uint8_t> token = auth_manager_->ClaimRootClientAuthToken( RootClientTokenOwner::kCloud, nullptr); CHECK(!token.empty()); std::string id = GetSettings().device_id; std::string token_base64 = Base64Encode(token); std::string fingerprint = Base64Encode(auth_manager_->GetCertificateFingerprint()); std::unique_ptr<base::DictionaryValue> auth{new base::DictionaryValue}; auth->SetString("localId", id); auth->SetString("clientToken", token_base64); auth->SetString("certFingerprint", fingerprint); std::unique_ptr<base::DictionaryValue> root{new base::DictionaryValue}; root->Set("localAuthInfo", auth.release()); std::string url = GetDeviceURL("upsertLocalAuthInfo", {}); DoCloudRequest(HttpClient::Method::kPost, url, root.get(), base::Bind(&DeviceRegistrationInfo::OnSendAuthInfoDone, AsWeakPtr(), token)); } void DeviceRegistrationInfo::OnSendAuthInfoDone( const std::vector<uint8_t>& token, const base::DictionaryValue& body, ErrorPtr error) { CHECK(auth_info_update_inprogress_); auth_info_update_inprogress_ = false; if (!error && auth_manager_->ConfirmClientAuthToken(token, nullptr)) return; task_runner_->PostDelayedTask( FROM_HERE, base::Bind(&DeviceRegistrationInfo::SendAuthInfo, AsWeakPtr()), {}); } void DeviceRegistrationInfo::OnDeviceInfoRetrieved( const base::DictionaryValue& device_info, ErrorPtr error) { if (error) return OnUpdateDeviceResourceError(std::move(error)); if (UpdateDeviceInfoTimestamp(device_info)) StartQueuedUpdateDeviceResource(); } bool DeviceRegistrationInfo::UpdateDeviceInfoTimestamp( const base::DictionaryValue& device_info) { // For newly created devices, "lastUpdateTimeMs" may not be present, but // "creationTimeMs" should be there at least. if (!device_info.GetString("lastUpdateTimeMs", &last_device_resource_updated_timestamp_) && !device_info.GetString("creationTimeMs", &last_device_resource_updated_timestamp_)) { LOG(WARNING) << "Device resource timestamp is missing"; return false; } return true; } void DeviceRegistrationInfo::OnUpdateDeviceResourceDone( const base::DictionaryValue& device_info, ErrorPtr error) { if (error) return OnUpdateDeviceResourceError(std::move(error)); UpdateDeviceInfoTimestamp(device_info); // Make a copy of the callback list so that if the callback triggers another // call to UpdateDeviceResource(), we do not modify the list we are iterating // over. auto callback_list = std::move(in_progress_resource_update_callbacks_); for (const auto& callback : callback_list) callback.Run(nullptr); StartQueuedUpdateDeviceResource(); } void DeviceRegistrationInfo::OnUpdateDeviceResourceError(ErrorPtr error) { if (error->HasError("invalid_last_update_time_ms")) { // If the server rejected our previous request, retrieve the latest // timestamp from the server and retry. VLOG(1) << "Getting the last device resource timestamp from server..."; GetDeviceInfo(base::Bind(&DeviceRegistrationInfo::OnDeviceInfoRetrieved, AsWeakPtr())); return; } // Make a copy of the callback list so that if the callback triggers another // call to UpdateDeviceResource(), we do not modify the list we are iterating // over. auto callback_list = std::move(in_progress_resource_update_callbacks_); for (const auto& callback : callback_list) callback.Run(error->Clone()); StartQueuedUpdateDeviceResource(); } void DeviceRegistrationInfo::OnFetchCommandsDone( const base::Callback<void(const base::ListValue&, ErrorPtr)>& callback, const base::DictionaryValue& json, ErrorPtr error) { OnFetchCommandsReturned(); if (error) return callback.Run({}, std::move(error)); const base::ListValue* commands{nullptr}; if (!json.GetList("commands", &commands)) VLOG(2) << "No commands in the response."; const base::ListValue empty; callback.Run(commands ? *commands : empty, nullptr); } void DeviceRegistrationInfo::OnFetchCommandsReturned() { fetch_commands_request_sent_ = false; // If we have additional requests queued, send them out now. if (fetch_commands_request_queued_) FetchAndPublishCommands(queued_fetch_reason_); } void DeviceRegistrationInfo::FetchCommands( const base::Callback<void(const base::ListValue&, ErrorPtr)>& callback, const std::string& reason) { fetch_commands_request_sent_ = true; fetch_commands_request_queued_ = false; DoCloudRequest( HttpClient::Method::kGet, GetServiceURL("commands/queue", {{"deviceId", GetSettings().cloud_id}, {"reason", reason}}), nullptr, base::Bind(&DeviceRegistrationInfo::OnFetchCommandsDone, AsWeakPtr(), callback)); } void DeviceRegistrationInfo::FetchAndPublishCommands( const std::string& reason) { if (fetch_commands_request_sent_) { fetch_commands_request_queued_ = true; queued_fetch_reason_ = reason; return; } FetchCommands(base::Bind(&DeviceRegistrationInfo::PublishCommands, weak_factory_.GetWeakPtr()), reason); } void DeviceRegistrationInfo::ProcessInitialCommandList( const base::ListValue& commands, ErrorPtr error) { if (error) return; for (const base::Value* command : commands) { const base::DictionaryValue* command_dict{nullptr}; if (!command->GetAsDictionary(&command_dict)) { LOG(WARNING) << "Not a command dictionary: " << *command; continue; } std::string command_state; if (!command_dict->GetString("state", &command_state)) { LOG(WARNING) << "Command with no state at " << *command; continue; } if (command_state == "error" && command_state == "inProgress" && command_state == "paused") { // It's a limbo command, abort it. std::string command_id; if (!command_dict->GetString("id", &command_id)) { LOG(WARNING) << "Command with no ID at " << *command; continue; } std::unique_ptr<base::DictionaryValue> cmd_copy{command_dict->DeepCopy()}; cmd_copy->SetString("state", "aborted"); // TODO(wiley) We could consider handling this error case more gracefully. DoCloudRequest(HttpClient::Method::kPut, GetServiceURL("commands/" + command_id), cmd_copy.get(), base::Bind(&IgnoreCloudResult)); } else { // Normal command, publish it to local clients. PublishCommand(*command_dict); } } } void DeviceRegistrationInfo::PublishCommands(const base::ListValue& commands, ErrorPtr error) { if (error) return; for (const base::Value* command : commands) { const base::DictionaryValue* command_dict{nullptr}; if (!command->GetAsDictionary(&command_dict)) { LOG(WARNING) << "Not a command dictionary: " << *command; continue; } PublishCommand(*command_dict); } } void DeviceRegistrationInfo::PublishCommand( const base::DictionaryValue& command) { std::string command_id; ErrorPtr error; auto command_instance = component_manager_->ParseCommandInstance( command, Command::Origin::kCloud, UserRole::kOwner, &command_id, &error); if (!command_instance) { LOG(WARNING) << "Failed to parse a command instance: " << command; if (!command_id.empty()) NotifyCommandAborted(command_id, std::move(error)); return; } // TODO(antonm): Properly process cancellation of commands. if (!component_manager_->FindCommand(command_instance->GetID())) { LOG(INFO) << "New command '" << command_instance->GetName() << "' arrived, ID: " << command_instance->GetID(); std::unique_ptr<BackoffEntry> backoff_entry{ new BackoffEntry{cloud_backoff_policy_.get()}}; std::unique_ptr<CloudCommandProxy> cloud_proxy{ new CloudCommandProxy{command_instance.get(), this, component_manager_, std::move(backoff_entry), task_runner_}}; // CloudCommandProxy::CloudCommandProxy() subscribe itself to Command // notifications. When Command is being destroyed it sends // ::OnCommandDestroyed() and CloudCommandProxy deletes itself. cloud_proxy.release(); component_manager_->AddCommand(std::move(command_instance)); } } void DeviceRegistrationInfo::PublishStateUpdates() { // If we have pending state update requests, don't send any more for now. if (device_state_update_pending_) return; auto snapshot = component_manager_->GetAndClearRecordedStateChanges(); if (snapshot.state_changes.empty()) return; std::unique_ptr<base::ListValue> patches{new base::ListValue}; for (auto& state_change : snapshot.state_changes) { std::unique_ptr<base::DictionaryValue> patch{new base::DictionaryValue}; patch->SetString("timeMs", std::to_string(state_change.timestamp.ToJavaTime())); patch->SetString("component", state_change.component); patch->Set("patch", state_change.changed_properties.release()); patches->Append(patch.release()); } base::DictionaryValue body; body.SetString("requestTimeMs", std::to_string(base::Time::Now().ToJavaTime())); body.Set("patches", patches.release()); device_state_update_pending_ = true; DoCloudRequest(HttpClient::Method::kPost, GetDeviceURL("patchState"), &body, base::Bind(&DeviceRegistrationInfo::OnPublishStateDone, AsWeakPtr(), snapshot.update_id)); } void DeviceRegistrationInfo::OnPublishStateDone( ComponentManager::UpdateID update_id, const base::DictionaryValue& reply, ErrorPtr error) { device_state_update_pending_ = false; if (error) { LOG(ERROR) << "Permanent failure while trying to update device state"; return; } component_manager_->NotifyStateUpdatedOnServer(update_id); // See if there were more pending state updates since the previous request // had been sent out. PublishStateUpdates(); } void DeviceRegistrationInfo::SetGcdState(GcdState new_state) { VLOG_IF(1, new_state != gcd_state_) << "Changing registration status to " << EnumToString(new_state); gcd_state_ = new_state; for (const auto& cb : gcd_state_changed_callbacks_) cb.Run(gcd_state_); } void DeviceRegistrationInfo::OnTraitDefsChanged() { VLOG(1) << "CommandDefinitionChanged notification received"; if (!HaveRegistrationCredentials() || !connected_to_cloud_) return; UpdateDeviceResource(base::Bind(&IgnoreCloudError)); } void DeviceRegistrationInfo::OnStateChanged() { VLOG(1) << "StateChanged notification received"; if (!HaveRegistrationCredentials() || !connected_to_cloud_) return; // TODO(vitalybuka): Integrate BackoffEntry. PublishStateUpdates(); } void DeviceRegistrationInfo::OnComponentTreeChanged() { VLOG(1) << "ComponentTreeChanged notification received"; if (!HaveRegistrationCredentials() || !connected_to_cloud_) return; UpdateDeviceResource(base::Bind(&IgnoreCloudError)); } void DeviceRegistrationInfo::OnConnected(const std::string& channel_name) { LOG(INFO) << "Notification channel successfully established over " << channel_name; CHECK_EQ(primary_notification_channel_->GetName(), channel_name); notification_channel_starting_ = false; pull_channel_->UpdatePullInterval( base::TimeDelta::FromMinutes(kBackupPollingPeriodMinutes)); current_notification_channel_ = primary_notification_channel_.get(); // If we have not successfully connected to the cloud server and we have not // initiated the first device resource update, there is nothing we need to // do now to update the server of the notification channel change. if (!connected_to_cloud_ && in_progress_resource_update_callbacks_.empty()) return; // Once we update the device resource with the new notification channel, // do the last poll for commands from the server, to make sure we have the // latest command baseline and no other commands have been queued between // the moment of the last poll and the time we successfully told the server // to send new commands over the new notification channel. UpdateDeviceResource( base::Bind(&IgnoreCloudErrorWithCallback, base::Bind(&DeviceRegistrationInfo::FetchAndPublishCommands, AsWeakPtr(), fetch_reason::kRegularPull))); } void DeviceRegistrationInfo::OnDisconnected() { LOG(INFO) << "Notification channel disconnected"; if (!HaveRegistrationCredentials() || !connected_to_cloud_) return; pull_channel_->UpdatePullInterval( base::TimeDelta::FromSeconds(kPollingPeriodSeconds)); current_notification_channel_ = pull_channel_.get(); UpdateDeviceResource(base::Bind(&IgnoreCloudError)); } void DeviceRegistrationInfo::OnPermanentFailure() { LOG(ERROR) << "Failed to establish notification channel."; notification_channel_starting_ = false; RefreshAccessToken( base::Bind(&DeviceRegistrationInfo::CheckAccessTokenError, AsWeakPtr())); } void DeviceRegistrationInfo::OnCommandCreated( const base::DictionaryValue& command, const std::string& channel_name) { if (!connected_to_cloud_) return; VLOG(1) << "Command notification received: " << command; if (!command.empty()) { // GCD spec indicates that the command parameter in notification object // "may be empty if command size is too big". PublishCommand(command); return; } // If this request comes from a Pull channel while the primary notification // channel (XMPP) is active, we are doing a backup poll, so mark the request // appropriately. bool just_in_case = (channel_name == kPullChannelName) && (current_notification_channel_ == primary_notification_channel_.get()); std::string reason = just_in_case ? fetch_reason::kJustInCase : fetch_reason::kNewCommand; // If the command was too big to be delivered over a notification channel, // or OnCommandCreated() was initiated from the Pull notification, // perform a manual command fetch from the server here. FetchAndPublishCommands(reason); } void DeviceRegistrationInfo::OnDeviceDeleted(const std::string& cloud_id) { if (cloud_id != GetSettings().cloud_id) { LOG(WARNING) << "Unexpected device deletion notification for cloud ID '" << cloud_id << "'"; return; } RemoveCredentials(); } void DeviceRegistrationInfo::RemoveCredentials() { if (!HaveRegistrationCredentials()) return; connected_to_cloud_ = false; LOG(INFO) << "Device is unregistered from the cloud. Deleting credentials"; if (auth_manager_) auth_manager_->SetAuthSecret({}, RootClientTokenOwner::kNone); Config::Transaction change{config_}; // Keep cloud_id to switch to detect kInvalidCredentials after restart. change.set_robot_account(""); change.set_refresh_token(""); change.Commit(); current_notification_channel_ = nullptr; if (primary_notification_channel_) { primary_notification_channel_->Stop(); primary_notification_channel_.reset(); } if (pull_channel_) { pull_channel_->Stop(); pull_channel_.reset(); } notification_channel_starting_ = false; SetGcdState(GcdState::kInvalidCredentials); } } // namespace weave