/*
* Copyright (C) 2019 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "common/debug.h"
#include "common/expected.h"
#include "manager/event_manager.h"
#include "perfetto/rx_producer.h"
#include <android-base/properties.h>
#include <rxcpp/rx.hpp>
#include <atomic>
#include <functional>
using rxcpp::observe_on_one_worker;
namespace iorap::manager {
using binder::RequestId;
using binder::AppLaunchEvent;
using perfetto::PerfettoStreamCommand;
using perfetto::PerfettoTraceProto;
struct AppComponentName {
std::string package;
std::string activity_name;
static bool HasAppComponentName(const std::string& s) {
return s.find('/') != std::string::npos;
}
// "com.foo.bar/.A" -> {"com.foo.bar", ".A"}
static AppComponentName FromString(const std::string& s) {
constexpr const char delimiter = '/';
std::string package = s.substr(0, delimiter);
std::string activity_name = s;
activity_name.erase(0, s.find(delimiter) + sizeof(delimiter));
return {std::move(package), std::move(activity_name)};
}
// {"com.foo.bar", ".A"} -> "com.foo.bar/.A"
std::string ToString() const {
return package + "/" + activity_name;
}
/*
* '/' is encoded into %2F
* '%' is encoded into %25
*
* This allows the component name to be be used as a file name
* ('/' is illegal due to being a path separator) with minimal
* munging.
*/
// "com.foo.bar%2F.A%25" -> {"com.foo.bar", ".A%"}
static AppComponentName FromUrlEncodedString(const std::string& s) {
std::string cpy = s;
Replace(cpy, "%2F", "/");
Replace(cpy, "%25", "%");
return FromString(cpy);
}
// {"com.foo.bar", ".A%"} -> "com.foo.bar%2F.A%25"
std::string ToUrlEncodedString() const {
std::string s = ToString();
Replace(s, "%", "%25");
Replace(s, "/", "%2F");
return s;
}
private:
static bool Replace(std::string& str, const std::string& from, const std::string& to) {
// TODO: call in a loop to replace all occurrences, not just the first one.
const size_t start_pos = str.find(from);
if (start_pos == std::string::npos) {
return false;
}
str.replace(start_pos, from.length(), to);
return true;
}
};
std::ostream& operator<<(std::ostream& os, const AppComponentName& name) {
os << name.ToString();
return os;
}
// Main logic of the #OnAppLaunchEvent scan method.
//
// All functions are called from the same thread as the event manager
// functions.
//
// This is a data type, it's moved (std::move) around from one iteration
// of #scan to another.
struct AppLaunchEventState {
std::optional<AppComponentName> component_name_;
bool is_tracing_{false};
std::optional<rxcpp::composite_subscription> rx_lifetime_;
std::vector<rxcpp::composite_subscription> rx_in_flight_;
borrowed<perfetto::RxProducerFactory*> perfetto_factory_; // not null
borrowed<observe_on_one_worker*> thread_; // not null
borrowed<observe_on_one_worker*> io_thread_; // not null
explicit AppLaunchEventState(borrowed<perfetto::RxProducerFactory*> perfetto_factory,
borrowed<observe_on_one_worker*> thread,
borrowed<observe_on_one_worker*> io_thread) {
perfetto_factory_ = perfetto_factory;
DCHECK(perfetto_factory_ != nullptr);
thread_ = thread;
DCHECK(thread_ != nullptr);
io_thread_ = io_thread;
DCHECK(io_thread_ != nullptr);
}
// Updates the values in this struct only as a side effect.
//
// May create and fire a new rx chain on the same threads as passed
// in by the constructors.
void OnNewEvent(const AppLaunchEvent& event) {
LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event;
using Type = AppLaunchEvent::Type;
switch (event.type) {
case Type::kIntentStarted: {
DCHECK(!IsTracing());
// Optimistically start tracing if we have the activity in the intent.
if (!event.intent_proto->has_component()) {
// Can't do anything if there is no component in the proto.
LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: no component, can't trace";
break;
}
const std::string& package_name = event.intent_proto->component().package_name();
const std::string& class_name = event.intent_proto->component().class_name();
AppComponentName component_name{package_name, class_name};
component_name_ = component_name;
rx_lifetime_ = StartTracing(std::move(component_name));
break;
}
case Type::kIntentFailed:
AbortTrace();
break;
case Type::kActivityLaunched: {
// Cancel tracing for warm/hot.
// Restart tracing if the activity was unexpected.
AppLaunchEvent::Temperature temperature = event.temperature;
if (temperature != AppLaunchEvent::Temperature::kCold) {
LOG(DEBUG) << "AppLaunchEventState#OnNewEvent aborting trace due to non-cold temperature";
AbortTrace();
} else if (!IsTracing()) { // and the temperature is Cold.
// Start late trace when intent didn't have a component name
LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace";
const std::string& title = event.activity_record_proto->identifier().title();
if (!AppComponentName::HasAppComponentName(title)) {
// Proto comment claim this is sometimes a window title.
// We need the actual 'package/component' here, so just ignore it if it's a title.
LOG(WARNING) << "App launched without a component name: " << event;
break;
}
AppComponentName component_name = AppComponentName::FromString(title);
component_name_ = component_name;
rx_lifetime_ = StartTracing(std::move(component_name));
} else {
// FIXME: match actual component name against intent component name.
// abort traces if they don't match.
LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing";
}
break;
}
case Type::kActivityLaunchFinished:
// Finish tracing and collect trace buffer.
//
// TODO: this happens automatically when perfetto finishes its
// trace duration.
if (IsTracing()) {
MarkPendingTrace();
}
break;
case Type::kActivityLaunchCancelled:
// Abort tracing.
AbortTrace();
break;
default:
DCHECK(false) << "invalid type: " << event; // binder layer should've rejected this.
LOG(ERROR) << "invalid type: " << event; // binder layer should've rejected this.
}
}
bool IsTracing() const {
return is_tracing_;
}
rxcpp::composite_subscription StartTracing(AppComponentName component_name) {
DCHECK(!IsTracing());
auto /*observable<PerfettoStreamCommand>*/ perfetto_commands =
rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing)
// wait 1x
.concat(
// Pick a value longer than the perfetto config delay_ms, so that we send
// 'kShutdown' after tracing has already finished.
rxcpp::observable<>::interval(std::chrono::milliseconds(10000))
.take(2) // kStopTracing, kShutdown.
.map([](int value) {
// value is 1,2,3,...
return static_cast<PerfettoStreamCommand>(value); // 1,2, ...
})
);
auto /*observable<PerfettoTraceProto>*/ trace_proto_stream =
perfetto_factory_->CreateTraceStream(perfetto_commands);
// This immediately connects to perfetto asynchronously.
//
// TODO: create a perfetto handle earlier, to minimize perfetto startup latency.
rxcpp::composite_subscription lifetime;
trace_proto_stream
.tap([](const PerfettoTraceProto& trace_proto) {
LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)";
})
.observe_on(*thread_) // All work prior to 'observe_on' is handled on thread_.
.subscribe_on(*thread_) // All work prior to 'observe_on' is handled on thread_.
.observe_on(*io_thread_) // Write data on an idle-class-priority thread.
.tap([](const PerfettoTraceProto& trace_proto) {
LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)";
})
.as_blocking() // TODO: remove.
.subscribe(/*out*/lifetime,
/*on_next*/[component_name]
(PerfettoTraceProto trace_proto) {
std::string file_path = "/data/misc/iorapd/";
file_path += component_name.ToUrlEncodedString();
file_path += ".perfetto_trace.pb";
// TODO: timestamp each file into a subdirectory.
if (!trace_proto.WriteFullyToFile(file_path)) {
LOG(ERROR) << "Failed to save TraceBuffer to " << file_path;
} else {
LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path;
}
},
/*on_error*/[](rxcpp::util::error_ptr err) {
LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err);
});
is_tracing_ = true;
return lifetime;
}
void AbortTrace() {
LOG(VERBOSE) << "AppLaunchEventState - AbortTrace";
is_tracing_ = false;
if (rx_lifetime_) {
// TODO: it would be good to call perfetto Destroy.
LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe";
rx_lifetime_->unsubscribe();
rx_lifetime_.reset();
}
}
void MarkPendingTrace() {
LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace";
DCHECK(is_tracing_);
DCHECK(rx_lifetime_.has_value());
if (rx_lifetime_) {
LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved";
// Don't unsubscribe because that would cause the perfetto TraceBuffer
// to get dropped on the floor.
//
// Instead, we want to let it finish and write it out to a file.
rx_in_flight_.push_back(*std::move(rx_lifetime_));
rx_lifetime_.reset();
} else {
LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty";
}
// FIXME: how do we clear this vector?
}
};
// Convert callback pattern into reactive pattern.
struct AppLaunchEventSubject {
using RefWrapper =
std::reference_wrapper<const AppLaunchEvent>;
AppLaunchEventSubject() {}
void Subscribe(rxcpp::subscriber<RefWrapper> subscriber) {
DCHECK(ready_ != true) << "Cannot Subscribe twice";
subscriber_ = std::move(subscriber);
// Release edge of synchronizes-with AcquireIsReady.
ready_.store(true);
}
void OnNext(const AppLaunchEvent& e) {
if (!AcquireIsReady()) {
return;
}
if (!subscriber_->is_subscribed()) {
return;
}
/*
* TODO: fix upstream.
*
* Rx workaround: this fails to compile when
* the observable is a reference type:
*
* external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const'
* virtual void on_next(T&&) const {};
*
* external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here
* virtual void on_next(T&) const {};
*
* (The workaround is to use reference_wrapper instead
* of const AppLaunchEvent&)
*/
subscriber_->on_next(std::cref(e));
}
void OnCompleted() {
if (!AcquireIsReady()) {
return;
}
subscriber_->on_completed();
}
private:
bool AcquireIsReady() {
// Synchronizes-with the release-edge in Subscribe.
// This can happen much later, only once the subscription actually happens.
// However, as far as I know, 'rxcpp::subscriber' is not thread safe,
// (but the observable chain itself can be made thread-safe via #observe_on, etc).
// so we must avoid reading it until it has been fully synchronized.
//
// TODO: investigate rxcpp subscribers and see if we can get rid of this atomics,
// to make it simpler.
return ready_.load();
}
// TODO: also track the RequestId ?
std::atomic<bool> ready_{false};
std::optional<rxcpp::subscriber<RefWrapper>> subscriber_;
};
class EventManager::Impl {
public:
Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory)
: perfetto_factory_(perfetto_factory),
worker_thread_(rxcpp::observe_on_new_thread()),
worker_thread2_(rxcpp::observe_on_new_thread()),
io_thread_(perfetto::ObserveOnNewIoThread()) {
// TODO: read all properties from one config class.
tracing_allowed_ = ::android::base::GetBoolProperty("iorapd.perfetto.enable", /*default*/false);
if (tracing_allowed_) {
rx_lifetime_ = InitializeRxGraph();
} else {
LOG(WARNING) << "Tracing disabled by iorapd.perfetto.enable=false";
}
}
bool OnAppLaunchEvent(RequestId request_id,
const AppLaunchEvent& event) {
LOG(VERBOSE) << "EventManager::OnAppLaunchEvent("
<< "request_id=" << request_id.request_id << ","
<< event;
app_launch_event_subject_.OnNext(event);
return true;
}
rxcpp::composite_subscription InitializeRxGraph() {
LOG(VERBOSE) << "EventManager::InitializeRxGraph";
app_launch_events_ = rxcpp::observable<>::create<AppLaunchEventRefWrapper>(
[&](rxcpp::subscriber<AppLaunchEventRefWrapper> subscriber) {
app_launch_event_subject_.Subscribe(std::move(subscriber));
});
rxcpp::composite_subscription lifetime;
AppLaunchEventState initial_state{&perfetto_factory_, &worker_thread2_, &io_thread_};
app_launch_events_
.subscribe_on(worker_thread_)
.scan(std::move(initial_state),
[](AppLaunchEventState state, AppLaunchEventRefWrapper event) {
state.OnNewEvent(event.get());
return state;
})
.subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) {
// Intentionally left blank.
(void)state;
});
return lifetime;
}
perfetto::RxProducerFactory& perfetto_factory_;
bool tracing_allowed_{true};
using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper;
rxcpp::observable<AppLaunchEventRefWrapper> app_launch_events_;
AppLaunchEventSubject app_launch_event_subject_;
rxcpp::observable<RequestId> completed_requests_;
// regular-priority thread to handle binder callbacks.
observe_on_one_worker worker_thread_;
observe_on_one_worker worker_thread2_;
// low priority idle-class thread for IO operations.
observe_on_one_worker io_thread_;
rxcpp::composite_subscription rx_lifetime_;
//INTENTIONAL_COMPILER_ERROR_HERE:
// FIXME:
// ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function
// that the main thread can call. This would subscribe on all the observables we internally
// have here (probably on an event-manager-dedicated thread for simplicity).
//
// ideally we'd just reuse the binder thread to handle the events but I'm not super sure,
// maybe this already works with the identity_current_thread coordination?
};
using Impl = EventManager::Impl;
EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory)
: impl_(new Impl(perfetto_factory)) {}
std::shared_ptr<EventManager> EventManager::Create() {
static perfetto::PerfettoDependencies::Injector injector{
perfetto::PerfettoDependencies::CreateComponent
};
static perfetto::RxProducerFactory producer_factory{
/*borrow*/injector
};
return EventManager::Create(/*borrow*/producer_factory);
}
std::shared_ptr<EventManager> EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) {
std::shared_ptr<EventManager> p{new EventManager{/*borrow*/perfetto_factory}};
return p;
}
bool EventManager::OnAppLaunchEvent(RequestId request_id,
const AppLaunchEvent& event) {
return impl_->OnAppLaunchEvent(request_id, event);
}
} // namespace iorap::manager