普通文本  |  497行  |  16.42 KB

/*
 * Copyright (C) 2019 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "common/debug.h"
#include "common/expected.h"
#include "manager/event_manager.h"
#include "perfetto/rx_producer.h"

#include <android-base/properties.h>
#include <rxcpp/rx.hpp>

#include <atomic>
#include <functional>

using rxcpp::observe_on_one_worker;

namespace iorap::manager {

using binder::RequestId;
using binder::AppLaunchEvent;
using perfetto::PerfettoStreamCommand;
using perfetto::PerfettoTraceProto;

struct AppComponentName {
  std::string package;
  std::string activity_name;

  static bool HasAppComponentName(const std::string& s) {
    return s.find('/') != std::string::npos;
  }

  // "com.foo.bar/.A" -> {"com.foo.bar", ".A"}
  static AppComponentName FromString(const std::string& s) {
    constexpr const char delimiter = '/';
    std::string package = s.substr(0, delimiter);

    std::string activity_name = s;
    activity_name.erase(0, s.find(delimiter) + sizeof(delimiter));

    return {std::move(package), std::move(activity_name)};
  }

  // {"com.foo.bar", ".A"} -> "com.foo.bar/.A"
  std::string ToString() const {
    return package + "/" + activity_name;
  }

  /*
   * '/' is encoded into %2F
   * '%' is encoded into %25
   *
   * This allows the component name to be be used as a file name
   * ('/' is illegal due to being a path separator) with minimal
   * munging.
   */

  // "com.foo.bar%2F.A%25" -> {"com.foo.bar", ".A%"}
  static AppComponentName FromUrlEncodedString(const std::string& s) {
    std::string cpy = s;
    Replace(cpy, "%2F", "/");
    Replace(cpy, "%25", "%");

    return FromString(cpy);
  }

  // {"com.foo.bar", ".A%"} -> "com.foo.bar%2F.A%25"
  std::string ToUrlEncodedString() const {
    std::string s = ToString();
    Replace(s, "%", "%25");
    Replace(s, "/", "%2F");
    return s;
  }

 private:
  static bool Replace(std::string& str, const std::string& from, const std::string& to) {
    // TODO: call in a loop to replace all occurrences, not just the first one.
    const size_t start_pos = str.find(from);
    if (start_pos == std::string::npos) {
      return false;
    }

    str.replace(start_pos, from.length(), to);

    return true;
}
};

std::ostream& operator<<(std::ostream& os, const AppComponentName& name) {
  os << name.ToString();
  return os;
}

// Main logic of the #OnAppLaunchEvent scan method.
//
// All functions are called from the same thread as the event manager
// functions.
//
// This is a data type, it's moved (std::move) around from one iteration
// of #scan to another.
struct AppLaunchEventState {
  std::optional<AppComponentName> component_name_;

  bool is_tracing_{false};
  std::optional<rxcpp::composite_subscription> rx_lifetime_;
  std::vector<rxcpp::composite_subscription> rx_in_flight_;

  borrowed<perfetto::RxProducerFactory*> perfetto_factory_;  // not null
  borrowed<observe_on_one_worker*> thread_;  // not null
  borrowed<observe_on_one_worker*> io_thread_;  // not null

  explicit AppLaunchEventState(borrowed<perfetto::RxProducerFactory*> perfetto_factory,
                               borrowed<observe_on_one_worker*> thread,
                               borrowed<observe_on_one_worker*> io_thread) {
    perfetto_factory_ = perfetto_factory;
    DCHECK(perfetto_factory_ != nullptr);

    thread_ = thread;
    DCHECK(thread_ != nullptr);

    io_thread_ = io_thread;
    DCHECK(io_thread_ != nullptr);
  }

  // Updates the values in this struct only as a side effect.
  //
  // May create and fire a new rx chain on the same threads as passed
  // in by the constructors.
  void OnNewEvent(const AppLaunchEvent& event) {
    LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: " << event;

    using Type = AppLaunchEvent::Type;

    switch (event.type) {
      case Type::kIntentStarted: {
        DCHECK(!IsTracing());
        // Optimistically start tracing if we have the activity in the intent.
        if (!event.intent_proto->has_component()) {
          // Can't do anything if there is no component in the proto.
          LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent: no component, can't trace";
          break;
        }

        const std::string& package_name = event.intent_proto->component().package_name();
        const std::string& class_name = event.intent_proto->component().class_name();
        AppComponentName component_name{package_name, class_name};

        component_name_ = component_name;
        rx_lifetime_ = StartTracing(std::move(component_name));

        break;
      }
      case Type::kIntentFailed:
        AbortTrace();
        break;
      case Type::kActivityLaunched: {
        // Cancel tracing for warm/hot.
        // Restart tracing if the activity was unexpected.

        AppLaunchEvent::Temperature temperature = event.temperature;
        if (temperature != AppLaunchEvent::Temperature::kCold) {
          LOG(DEBUG) << "AppLaunchEventState#OnNewEvent aborting trace due to non-cold temperature";
          AbortTrace();
        } else if (!IsTracing()) {  // and the temperature is Cold.
          // Start late trace when intent didn't have a component name
          LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent need to start new trace";

          const std::string& title = event.activity_record_proto->identifier().title();
          if (!AppComponentName::HasAppComponentName(title)) {
            // Proto comment claim this is sometimes a window title.
            // We need the actual 'package/component' here, so just ignore it if it's a title.
            LOG(WARNING) << "App launched without a component name: " << event;
            break;
          }

          AppComponentName component_name = AppComponentName::FromString(title);

          component_name_ = component_name;
          rx_lifetime_ = StartTracing(std::move(component_name));
        } else {
          // FIXME: match actual component name against intent component name.
          // abort traces if they don't match.

          LOG(VERBOSE) << "AppLaunchEventState#OnNewEvent already tracing";
        }
        break;
      }
      case Type::kActivityLaunchFinished:
        // Finish tracing and collect trace buffer.
        //
        // TODO: this happens automatically when perfetto finishes its
        // trace duration.
        if (IsTracing()) {
          MarkPendingTrace();
        }
        break;
      case Type::kActivityLaunchCancelled:
        // Abort tracing.
        AbortTrace();
        break;
      default:
        DCHECK(false) << "invalid type: " << event;  // binder layer should've rejected this.
        LOG(ERROR) << "invalid type: " << event;  // binder layer should've rejected this.
    }
  }

  bool IsTracing() const {
    return is_tracing_;
  }

  rxcpp::composite_subscription StartTracing(AppComponentName component_name) {
    DCHECK(!IsTracing());

    auto /*observable<PerfettoStreamCommand>*/ perfetto_commands =
      rxcpp::observable<>::just(PerfettoStreamCommand::kStartTracing)
          // wait 1x
          .concat(
              // Pick a value longer than the perfetto config delay_ms, so that we send
              // 'kShutdown' after tracing has already finished.
              rxcpp::observable<>::interval(std::chrono::milliseconds(10000))
                  .take(2)  // kStopTracing, kShutdown.
                  .map([](int value) {
                         // value is 1,2,3,...
                         return static_cast<PerfettoStreamCommand>(value);  // 1,2, ...
                       })
          );

    auto /*observable<PerfettoTraceProto>*/ trace_proto_stream =
        perfetto_factory_->CreateTraceStream(perfetto_commands);
    // This immediately connects to perfetto asynchronously.
    //
    // TODO: create a perfetto handle earlier, to minimize perfetto startup latency.

    rxcpp::composite_subscription lifetime;

    trace_proto_stream
      .tap([](const PerfettoTraceProto& trace_proto) {
             LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (1)";
           })
      .observe_on(*thread_)   // All work prior to 'observe_on' is handled on thread_.
      .subscribe_on(*thread_)   // All work prior to 'observe_on' is handled on thread_.
      .observe_on(*io_thread_)  // Write data on an idle-class-priority thread.
      .tap([](const PerfettoTraceProto& trace_proto) {
             LOG(VERBOSE) << "StartTracing -- PerfettoTraceProto received (2)";
           })
      .as_blocking()  // TODO: remove.
      .subscribe(/*out*/lifetime,
        /*on_next*/[component_name]
        (PerfettoTraceProto trace_proto) {
          std::string file_path = "/data/misc/iorapd/";
          file_path += component_name.ToUrlEncodedString();
          file_path += ".perfetto_trace.pb";

          // TODO: timestamp each file into a subdirectory.

          if (!trace_proto.WriteFullyToFile(file_path)) {
            LOG(ERROR) << "Failed to save TraceBuffer to " << file_path;
          } else {
            LOG(INFO) << "Perfetto TraceBuffer saved to file: " << file_path;
          }
        },
        /*on_error*/[](rxcpp::util::error_ptr err) {
          LOG(ERROR) << "Perfetto trace proto collection error: " << rxcpp::util::what(err);
        });

    is_tracing_ = true;

    return lifetime;
  }

  void AbortTrace() {
    LOG(VERBOSE) << "AppLaunchEventState - AbortTrace";
    is_tracing_ = false;
    if (rx_lifetime_) {
      // TODO: it would be good to call perfetto Destroy.

      LOG(VERBOSE) << "AppLaunchEventState - AbortTrace - Unsubscribe";
      rx_lifetime_->unsubscribe();
      rx_lifetime_.reset();
    }
  }

  void MarkPendingTrace() {
    LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace";
    DCHECK(is_tracing_);
    DCHECK(rx_lifetime_.has_value());

    if (rx_lifetime_) {
      LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime moved";
      // Don't unsubscribe because that would cause the perfetto TraceBuffer
      // to get dropped on the floor.
      //
      // Instead, we want to let it finish and write it out to a file.
      rx_in_flight_.push_back(*std::move(rx_lifetime_));
      rx_lifetime_.reset();
    } else {
      LOG(VERBOSE) << "AppLaunchEventState - MarkPendingTrace - lifetime was empty";
    }

    // FIXME: how do we clear this vector?
  }
};

// Convert callback pattern into reactive pattern.
struct AppLaunchEventSubject {
  using RefWrapper =
    std::reference_wrapper<const AppLaunchEvent>;

  AppLaunchEventSubject() {}

  void Subscribe(rxcpp::subscriber<RefWrapper> subscriber) {
    DCHECK(ready_ != true) << "Cannot Subscribe twice";

    subscriber_ = std::move(subscriber);

    // Release edge of synchronizes-with AcquireIsReady.
    ready_.store(true);
  }

  void OnNext(const AppLaunchEvent& e) {
    if (!AcquireIsReady()) {
      return;
    }

    if (!subscriber_->is_subscribed()) {
      return;
    }

    /*
     * TODO: fix upstream.
     *
     * Rx workaround: this fails to compile when
     * the observable is a reference type:
     *
     * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:354:18: error: multiple overloads of 'on_next' instantiate to the same signature 'void (const iorap::binder::AppLaunchEvent &) const'
     *   virtual void on_next(T&&) const {};
     *
     * external/Reactive-Extensions/RxCpp/Rx/v2/src/rxcpp/rx-observer.hpp:353:18: note: previous declaration is here
     *   virtual void on_next(T&) const {};
     *
     * (The workaround is to use reference_wrapper instead
     *  of const AppLaunchEvent&)
     */
    subscriber_->on_next(std::cref(e));

  }

  void OnCompleted() {
    if (!AcquireIsReady()) {
      return;
    }

    subscriber_->on_completed();
  }

 private:
  bool AcquireIsReady() {
    // Synchronizes-with the release-edge in Subscribe.
    // This can happen much later, only once the subscription actually happens.

    // However, as far as I know, 'rxcpp::subscriber' is not thread safe,
    // (but the observable chain itself can be made thread-safe via #observe_on, etc).
    // so we must avoid reading it until it has been fully synchronized.
    //
    // TODO: investigate rxcpp subscribers and see if we can get rid of this atomics,
    // to make it simpler.
    return ready_.load();
  }

  // TODO: also track the RequestId ?

  std::atomic<bool> ready_{false};


  std::optional<rxcpp::subscriber<RefWrapper>> subscriber_;
};

class EventManager::Impl {
 public:
  Impl(/*borrow*/perfetto::RxProducerFactory& perfetto_factory)
    : perfetto_factory_(perfetto_factory),
      worker_thread_(rxcpp::observe_on_new_thread()),
      worker_thread2_(rxcpp::observe_on_new_thread()),
      io_thread_(perfetto::ObserveOnNewIoThread()) {

    // TODO: read all properties from one config class.
    tracing_allowed_ = ::android::base::GetBoolProperty("iorapd.perfetto.enable", /*default*/false);

    if (tracing_allowed_) {
      rx_lifetime_ = InitializeRxGraph();
    } else {
      LOG(WARNING) << "Tracing disabled by iorapd.perfetto.enable=false";
    }
  }

  bool OnAppLaunchEvent(RequestId request_id,
                        const AppLaunchEvent& event) {
    LOG(VERBOSE) << "EventManager::OnAppLaunchEvent("
                 << "request_id=" << request_id.request_id << ","
                 << event;

    app_launch_event_subject_.OnNext(event);

    return true;
  }

  rxcpp::composite_subscription InitializeRxGraph() {
    LOG(VERBOSE) << "EventManager::InitializeRxGraph";

    app_launch_events_ = rxcpp::observable<>::create<AppLaunchEventRefWrapper>(
      [&](rxcpp::subscriber<AppLaunchEventRefWrapper> subscriber) {
        app_launch_event_subject_.Subscribe(std::move(subscriber));
      });

    rxcpp::composite_subscription lifetime;

    AppLaunchEventState initial_state{&perfetto_factory_, &worker_thread2_, &io_thread_};
    app_launch_events_
      .subscribe_on(worker_thread_)
      .scan(std::move(initial_state),
            [](AppLaunchEventState state, AppLaunchEventRefWrapper event) {
              state.OnNewEvent(event.get());
              return state;
            })
      .subscribe(/*out*/lifetime, [](const AppLaunchEventState& state) {
                   // Intentionally left blank.
                   (void)state;
                 });

    return lifetime;
  }

  perfetto::RxProducerFactory& perfetto_factory_;
  bool tracing_allowed_{true};

  using AppLaunchEventRefWrapper = AppLaunchEventSubject::RefWrapper;
  rxcpp::observable<AppLaunchEventRefWrapper> app_launch_events_;
  AppLaunchEventSubject app_launch_event_subject_;

  rxcpp::observable<RequestId> completed_requests_;

  // regular-priority thread to handle binder callbacks.
  observe_on_one_worker worker_thread_;
  observe_on_one_worker worker_thread2_;
  // low priority idle-class thread for IO operations.
  observe_on_one_worker io_thread_;

  rxcpp::composite_subscription rx_lifetime_;

//INTENTIONAL_COMPILER_ERROR_HERE:
  // FIXME:
  // ok so we want to expose a 'BlockingSubscribe' or a 'Subscribe' or some kind of function
  // that the main thread can call. This would subscribe on all the observables we internally
  // have here (probably on an event-manager-dedicated thread for simplicity).
  //
  // ideally we'd just reuse the binder thread to handle the events but I'm not super sure,
  // maybe this already works with the identity_current_thread coordination?
};
using Impl = EventManager::Impl;

EventManager::EventManager(perfetto::RxProducerFactory& perfetto_factory)
    : impl_(new Impl(perfetto_factory)) {}

std::shared_ptr<EventManager> EventManager::Create() {
  static perfetto::PerfettoDependencies::Injector injector{
    perfetto::PerfettoDependencies::CreateComponent
  };
  static perfetto::RxProducerFactory producer_factory{
    /*borrow*/injector
  };
  return EventManager::Create(/*borrow*/producer_factory);
}

std::shared_ptr<EventManager> EventManager::Create(perfetto::RxProducerFactory& perfetto_factory) {
  std::shared_ptr<EventManager> p{new EventManager{/*borrow*/perfetto_factory}};
  return p;
}

bool EventManager::OnAppLaunchEvent(RequestId request_id,
                                    const AppLaunchEvent& event) {
  return impl_->OnAppLaunchEvent(request_id, event);
}

}  // namespace iorap::manager