C++程序  |  166行  |  5.58 KB

// Copyright (C) 2019 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef IORAP_SRC_PERFETTO_RX_PRODUCER_H_
#define IORAP_SRC_PERFETTO_RX_PRODUCER_H_

#include "perfetto/perfetto_consumer.h"       // libiorap

#include <perfetto/config/trace_config.pb.h>  // libperfetto
#include <rxcpp/rx.hpp>

#include <iosfwd>
#include <functional>
#include <optional>
#include <vector>

namespace iorap::perfetto {

struct PerfettoDependencies {
  using Component =
      fruit::Component<PerfettoConsumer, ::perfetto::protos::TraceConfig>;
  using Injector =
      fruit::Injector<PerfettoConsumer, ::perfetto::protos::TraceConfig>;
  using NormalizedComponent =
      fruit::NormalizedComponent<PerfettoConsumer, ::perfetto::protos::TraceConfig>;

  // Create a 'live' component that will talk to perfetto via traced.
  static Component CreateComponent(/*TODO: config params*/);

  // Create perfetto.protos.TraceConfig , serialized as a (machine-readable) string.
  //
  // The following ftrace events are enabled:
  // * mm_filemap_add_to_page_cache
  // * mm_filemap_delete_from_page_cache
  //
  // If deferred starting is also enabled, no tracing will begin until
  // ::perfetto::consumer::StartTracing is invoked.
  static ::perfetto::protos::TraceConfig CreateConfig(uint32_t duration_ms,
                                                      bool deferred_start = true,
                                                      uint32_t buffer_size = 4096);
};

// This acts as a lightweight type marker so that we know what data has actually
// encoded under the hood.
template <typename T>
struct BinaryWireProtobuf {
  std::vector<std::byte>& data() {
    return data_;
  }

  const std::vector<std::byte>& data() const {
    return data_;
  }

  size_t size() const {
    return data_.size();
  }

  explicit BinaryWireProtobuf(char* data, size_t size)
    : BinaryWireProtobuf(reinterpret_cast<std::byte*>(data), size) {
  }

  explicit BinaryWireProtobuf(std::byte* data, size_t size) {
    data_.resize(size);
    std::copy(data,
              data + size,
              data_.data());
  }

  // Important: Deserialization could fail, for example data is truncated or
  // some minor disc corruption occurred.
  template <typename U>
  std::optional<U> MaybeUnserialize() {
    U unencoded;

    if (!unencoded.ParseFromArray(data_.data(), data_.size())) {
      return std::nullopt;
    }

    return {std::move(unencoded)};
  }

  bool WriteFullyToFile(const std::string& path,
                        bool follow_symlinks = false) const;

 private:
  static bool CleanUpAfterFailedWrite(const std::string& path);
  bool WriteStringToFd(int fd) const;

  std::vector<std::byte> data_;
};

//using PerfettoTraceProto = BinaryWireProtobuf<::perfetto::protos::Trace>;
using PerfettoTraceProto = BinaryWireProtobuf<::google::protobuf::MessageLite>;

enum class PerfettoStreamCommand {
  kStartTracing, // -> () | on_error
  kStopTracing,  // -> on_next(PerfettoTraceProto) | on_error
  kShutdown,     // -> on_completed | on_error
  // XX: should shutdown be converted to use the rx suscriber#unsubscribe instead?
};

std::ostream& operator<<(std::ostream& os, PerfettoStreamCommand c);

struct RxProducerFactory {
  // Passing anything by value leads to a lot of pain and headache.
  // Pass in the injector by reference because nothing else seems to work.
  explicit RxProducerFactory(PerfettoDependencies::Injector& injector);

  // Create a one-shot perfetto observable that will begin
  // asynchronously producing a PerfettoTraceProto after the 'kStartTracing'
  // command is observed.
  //
  // libperfetto is immediately primed (i.e. connected in a deferred state)
  // upon calling this function, to reduce the latency of 'kStartTracing'.
  //
  // To finish the trace, push 'kStopTracing'. To cancel or tear down at any
  // time, push 'kShutdown'.
  //
  // The TraceProto may come out at any time after 'kStartTracing',
  // this is controlled by duration_ms in the TraceConfig.
  //
  // TODO: libperfetto should actually stop tracing when we ask it to,
  // instead of using a hardcoded time.
  //
  // The observable may go into #on_error at any time, if the underlying
  // libperfetto states transition to a failing state.
  // This usually means the OS is not configured correctly.
  rxcpp::observable<PerfettoTraceProto> CreateTraceStream(
      rxcpp::observable<PerfettoStreamCommand> commands);

  // TODO: is this refactor-able into a subscriber factory that takes
  // the commands-observable as a parameter?

  // TODO: infinite perfetto stream.

 private:
  // XX: why doesn't this just let me pass in a regular Component?
  PerfettoDependencies::Injector& injector_;

  friend void CollectPerfettoTraceBufferImmediately(
      RxProducerFactory& producer_factory,
      const std::string& arg_output_proto);
};

// An rx Coordination, which will cause a new thread to spawn for each new Worker.
//
// Idle-class priority is set for the CPU and IO priorities on the new thread.
//
// TODO: move to separate file
rxcpp::observe_on_one_worker ObserveOnNewIoThread();

}  // namespace iorap::perfetto
#endif  // IORAP_SRC_PERFETTO_RX_PRODUCER_H_