/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "src/tracing/ipc/consumer/consumer_ipc_client_impl.h" #include <inttypes.h> #include <string.h> #include "perfetto/base/task_runner.h" #include "perfetto/ipc/client.h" #include "perfetto/tracing/core/consumer.h" #include "perfetto/tracing/core/observable_events.h" #include "perfetto/tracing/core/trace_config.h" #include "perfetto/tracing/core/trace_stats.h" // TODO(fmayer): Add a test to check to what happens when ConsumerIPCClientImpl // gets destroyed w.r.t. the Consumer pointer. Also think to lifetime of the // Consumer* during the callbacks. namespace perfetto { // static. (Declared in include/tracing/ipc/consumer_ipc_client.h). std::unique_ptr<TracingService::ConsumerEndpoint> ConsumerIPCClient::Connect( const char* service_sock_name, Consumer* consumer, base::TaskRunner* task_runner) { return std::unique_ptr<TracingService::ConsumerEndpoint>( new ConsumerIPCClientImpl(service_sock_name, consumer, task_runner)); } ConsumerIPCClientImpl::ConsumerIPCClientImpl(const char* service_sock_name, Consumer* consumer, base::TaskRunner* task_runner) : consumer_(consumer), ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)), consumer_port_(this /* event_listener */), weak_ptr_factory_(this) { ipc_channel_->BindService(consumer_port_.GetWeakPtr()); } ConsumerIPCClientImpl::~ConsumerIPCClientImpl() = default; // Called by the IPC layer if the BindService() succeeds. void ConsumerIPCClientImpl::OnConnect() { connected_ = true; consumer_->OnConnect(); } void ConsumerIPCClientImpl::OnDisconnect() { PERFETTO_DLOG("Tracing service connection failure"); connected_ = false; consumer_->OnDisconnect(); } void ConsumerIPCClientImpl::EnableTracing(const TraceConfig& trace_config, base::ScopedFile fd) { if (!connected_) { PERFETTO_DLOG("Cannot EnableTracing(), not connected to tracing service"); return; } protos::EnableTracingRequest req; trace_config.ToProto(req.mutable_trace_config()); ipc::Deferred<protos::EnableTracingResponse> async_response; auto weak_this = weak_ptr_factory_.GetWeakPtr(); async_response.Bind( [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> response) { if (weak_this) weak_this->OnEnableTracingResponse(std::move(response)); }); // |fd| will be closed when this function returns, but it's fine because the // IPC layer dup()'s it when sending the IPC. consumer_port_.EnableTracing(req, std::move(async_response), *fd); } void ConsumerIPCClientImpl::ChangeTraceConfig(const TraceConfig&) { if (!connected_) { PERFETTO_DLOG( "Cannot ChangeTraceConfig(), not connected to tracing service"); return; } ipc::Deferred<protos::ChangeTraceConfigResponse> async_response; async_response.Bind( [](ipc::AsyncResult<protos::ChangeTraceConfigResponse> response) { if (!response) PERFETTO_DLOG("ChangeTraceConfig() failed"); }); protos::ChangeTraceConfigRequest req; consumer_port_.ChangeTraceConfig(req, std::move(async_response)); } void ConsumerIPCClientImpl::StartTracing() { if (!connected_) { PERFETTO_DLOG("Cannot StartTracing(), not connected to tracing service"); return; } ipc::Deferred<protos::StartTracingResponse> async_response; async_response.Bind( [](ipc::AsyncResult<protos::StartTracingResponse> response) { if (!response) PERFETTO_DLOG("StartTracing() failed"); }); protos::StartTracingRequest req; consumer_port_.StartTracing(req, std::move(async_response)); } void ConsumerIPCClientImpl::DisableTracing() { if (!connected_) { PERFETTO_DLOG("Cannot DisableTracing(), not connected to tracing service"); return; } ipc::Deferred<protos::DisableTracingResponse> async_response; async_response.Bind( [](ipc::AsyncResult<protos::DisableTracingResponse> response) { if (!response) PERFETTO_DLOG("DisableTracing() failed"); }); consumer_port_.DisableTracing(protos::DisableTracingRequest(), std::move(async_response)); } void ConsumerIPCClientImpl::ReadBuffers() { if (!connected_) { PERFETTO_DLOG("Cannot ReadBuffers(), not connected to tracing service"); return; } ipc::Deferred<protos::ReadBuffersResponse> async_response; // The IPC layer guarantees that callbacks are destroyed after this object // is destroyed (by virtue of destroying the |consumer_port_|). In turn the // contract of this class expects the caller to not destroy the Consumer class // before having destroyed this class. Hence binding |this| here is safe. async_response.Bind( [this](ipc::AsyncResult<protos::ReadBuffersResponse> response) { OnReadBuffersResponse(std::move(response)); }); consumer_port_.ReadBuffers(protos::ReadBuffersRequest(), std::move(async_response)); } void ConsumerIPCClientImpl::OnReadBuffersResponse( ipc::AsyncResult<protos::ReadBuffersResponse> response) { if (!response) { PERFETTO_DLOG("ReadBuffers() failed"); return; } std::vector<TracePacket> trace_packets; for (auto& resp_slice : *response->mutable_slices()) { partial_packet_.AddSlice( Slice(std::unique_ptr<std::string>(resp_slice.release_data()))); if (resp_slice.last_slice_for_packet()) trace_packets.emplace_back(std::move(partial_packet_)); } if (!trace_packets.empty() || !response.has_more()) consumer_->OnTraceData(std::move(trace_packets), response.has_more()); } void ConsumerIPCClientImpl::OnEnableTracingResponse( ipc::AsyncResult<protos::EnableTracingResponse> response) { if (!response || response->disabled()) consumer_->OnTracingDisabled(); } void ConsumerIPCClientImpl::FreeBuffers() { if (!connected_) { PERFETTO_DLOG("Cannot FreeBuffers(), not connected to tracing service"); return; } protos::FreeBuffersRequest req; ipc::Deferred<protos::FreeBuffersResponse> async_response; async_response.Bind( [](ipc::AsyncResult<protos::FreeBuffersResponse> response) { if (!response) PERFETTO_DLOG("FreeBuffers() failed"); }); consumer_port_.FreeBuffers(req, std::move(async_response)); } void ConsumerIPCClientImpl::Flush(uint32_t timeout_ms, FlushCallback callback) { if (!connected_) { PERFETTO_DLOG("Cannot Flush(), not connected to tracing service"); return callback(/*success=*/false); } protos::FlushRequest req; req.set_timeout_ms(static_cast<uint32_t>(timeout_ms)); ipc::Deferred<protos::FlushResponse> async_response; async_response.Bind( [callback](ipc::AsyncResult<protos::FlushResponse> response) { callback(!!response); }); consumer_port_.Flush(req, std::move(async_response)); } void ConsumerIPCClientImpl::Detach(const std::string& key) { if (!connected_) { PERFETTO_DLOG("Cannot Detach(), not connected to tracing service"); return; } protos::DetachRequest req; req.set_key(key); ipc::Deferred<protos::DetachResponse> async_response; auto weak_this = weak_ptr_factory_.GetWeakPtr(); async_response.Bind( [weak_this](ipc::AsyncResult<protos::DetachResponse> response) { if (weak_this) weak_this->consumer_->OnDetach(!!response); }); consumer_port_.Detach(req, std::move(async_response)); } void ConsumerIPCClientImpl::Attach(const std::string& key) { if (!connected_) { PERFETTO_DLOG("Cannot Attach(), not connected to tracing service"); return; } { protos::AttachRequest req; req.set_key(key); ipc::Deferred<protos::AttachResponse> async_response; auto weak_this = weak_ptr_factory_.GetWeakPtr(); async_response.Bind([weak_this]( ipc::AsyncResult<protos::AttachResponse> response) { if (!weak_this) return; TraceConfig trace_config; if (!response) { weak_this->consumer_->OnAttach(/*success=*/false, trace_config); return; } trace_config.FromProto(response->trace_config()); // If attached succesfully, also attach to the end-of-trace // notificaton callback, via EnableTracing(attach_notification_only). protos::EnableTracingRequest enable_req; enable_req.set_attach_notification_only(true); ipc::Deferred<protos::EnableTracingResponse> enable_resp; enable_resp.Bind( [weak_this](ipc::AsyncResult<protos::EnableTracingResponse> resp) { if (weak_this) weak_this->OnEnableTracingResponse(std::move(resp)); }); weak_this->consumer_port_.EnableTracing(enable_req, std::move(enable_resp)); weak_this->consumer_->OnAttach(/*success=*/true, trace_config); }); consumer_port_.Attach(req, std::move(async_response)); } } void ConsumerIPCClientImpl::GetTraceStats() { if (!connected_) { PERFETTO_DLOG("Cannot GetTraceStats(), not connected to tracing service"); return; } protos::GetTraceStatsRequest req; ipc::Deferred<protos::GetTraceStatsResponse> async_response; // The IPC layer guarantees that callbacks are destroyed after this object // is destroyed (by virtue of destroying the |consumer_port_|). In turn the // contract of this class expects the caller to not destroy the Consumer class // before having destroyed this class. Hence binding |this| here is safe. async_response.Bind( [this](ipc::AsyncResult<protos::GetTraceStatsResponse> response) { TraceStats trace_stats; if (!response) { consumer_->OnTraceStats(/*success=*/false, trace_stats); return; } trace_stats.FromProto(response->trace_stats()); consumer_->OnTraceStats(/*success=*/true, trace_stats); }); consumer_port_.GetTraceStats(req, std::move(async_response)); } void ConsumerIPCClientImpl::ObserveEvents(uint32_t enabled_event_types) { if (!connected_) { PERFETTO_DLOG("Cannot ObserveEvents(), not connected to tracing service"); return; } protos::ObserveEventsRequest req; if (enabled_event_types & ObservableEventType::kDataSourceInstances) { req.add_events_to_observe( protos::ObservableEvents::TYPE_DATA_SOURCES_INSTANCES); } ipc::Deferred<protos::ObserveEventsResponse> async_response; // The IPC layer guarantees that callbacks are destroyed after this object // is destroyed (by virtue of destroying the |consumer_port_|). In turn the // contract of this class expects the caller to not destroy the Consumer class // before having destroyed this class. Hence binding |this| here is safe. async_response.Bind( [this](ipc::AsyncResult<protos::ObserveEventsResponse> response) { // Skip empty response, which the service sends to close the stream. if (!response->events().instance_state_changes().size()) { PERFETTO_DCHECK(!response.has_more()); return; } ObservableEvents events; events.FromProto(response->events()); consumer_->OnObservableEvents(events); }); consumer_port_.ObserveEvents(req, std::move(async_response)); } } // namespace perfetto