// Copyright (C) 2019 The Android Open Source Project // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. #ifndef IORAP_SRC_PERFETTO_RX_PRODUCER_H_ #define IORAP_SRC_PERFETTO_RX_PRODUCER_H_ #include "perfetto/perfetto_consumer.h" // libiorap #include <perfetto/config/trace_config.pb.h> // libperfetto #include <rxcpp/rx.hpp> #include <iosfwd> #include <functional> #include <optional> #include <vector> namespace iorap::perfetto { struct PerfettoDependencies { using Component = fruit::Component<PerfettoConsumer, ::perfetto::protos::TraceConfig>; using Injector = fruit::Injector<PerfettoConsumer, ::perfetto::protos::TraceConfig>; using NormalizedComponent = fruit::NormalizedComponent<PerfettoConsumer, ::perfetto::protos::TraceConfig>; // Create a 'live' component that will talk to perfetto via traced. static Component CreateComponent(/*TODO: config params*/); // Create perfetto.protos.TraceConfig , serialized as a (machine-readable) string. // // The following ftrace events are enabled: // * mm_filemap_add_to_page_cache // * mm_filemap_delete_from_page_cache // // If deferred starting is also enabled, no tracing will begin until // ::perfetto::consumer::StartTracing is invoked. static ::perfetto::protos::TraceConfig CreateConfig(uint32_t duration_ms, bool deferred_start = true, uint32_t buffer_size = 4096); }; // This acts as a lightweight type marker so that we know what data has actually // encoded under the hood. template <typename T> struct BinaryWireProtobuf { std::vector<std::byte>& data() { return data_; } const std::vector<std::byte>& data() const { return data_; } size_t size() const { return data_.size(); } explicit BinaryWireProtobuf(char* data, size_t size) : BinaryWireProtobuf(reinterpret_cast<std::byte*>(data), size) { } explicit BinaryWireProtobuf(std::byte* data, size_t size) { data_.resize(size); std::copy(data, data + size, data_.data()); } // Important: Deserialization could fail, for example data is truncated or // some minor disc corruption occurred. template <typename U> std::optional<U> MaybeUnserialize() { U unencoded; if (!unencoded.ParseFromArray(data_.data(), data_.size())) { return std::nullopt; } return {std::move(unencoded)}; } bool WriteFullyToFile(const std::string& path, bool follow_symlinks = false) const; private: static bool CleanUpAfterFailedWrite(const std::string& path); bool WriteStringToFd(int fd) const; std::vector<std::byte> data_; }; //using PerfettoTraceProto = BinaryWireProtobuf<::perfetto::protos::Trace>; using PerfettoTraceProto = BinaryWireProtobuf<::google::protobuf::MessageLite>; enum class PerfettoStreamCommand { kStartTracing, // -> () | on_error kStopTracing, // -> on_next(PerfettoTraceProto) | on_error kShutdown, // -> on_completed | on_error // XX: should shutdown be converted to use the rx suscriber#unsubscribe instead? }; std::ostream& operator<<(std::ostream& os, PerfettoStreamCommand c); struct RxProducerFactory { // Passing anything by value leads to a lot of pain and headache. // Pass in the injector by reference because nothing else seems to work. explicit RxProducerFactory(PerfettoDependencies::Injector& injector); // Create a one-shot perfetto observable that will begin // asynchronously producing a PerfettoTraceProto after the 'kStartTracing' // command is observed. // // libperfetto is immediately primed (i.e. connected in a deferred state) // upon calling this function, to reduce the latency of 'kStartTracing'. // // To finish the trace, push 'kStopTracing'. To cancel or tear down at any // time, push 'kShutdown'. // // The TraceProto may come out at any time after 'kStartTracing', // this is controlled by duration_ms in the TraceConfig. // // TODO: libperfetto should actually stop tracing when we ask it to, // instead of using a hardcoded time. // // The observable may go into #on_error at any time, if the underlying // libperfetto states transition to a failing state. // This usually means the OS is not configured correctly. rxcpp::observable<PerfettoTraceProto> CreateTraceStream( rxcpp::observable<PerfettoStreamCommand> commands); // TODO: is this refactor-able into a subscriber factory that takes // the commands-observable as a parameter? // TODO: infinite perfetto stream. private: // XX: why doesn't this just let me pass in a regular Component? PerfettoDependencies::Injector& injector_; friend void CollectPerfettoTraceBufferImmediately( RxProducerFactory& producer_factory, const std::string& arg_output_proto); }; // An rx Coordination, which will cause a new thread to spawn for each new Worker. // // Idle-class priority is set for the CPU and IO priorities on the new thread. // // TODO: move to separate file rxcpp::observe_on_one_worker ObserveOnNewIoThread(); } // namespace iorap::perfetto #endif // IORAP_SRC_PERFETTO_RX_PRODUCER_H_