#include "consumer_channel.h" #include <log/log.h> #include <utils/Trace.h> #include <thread> #include <private/dvr/bufferhub_rpc.h> #include "producer_channel.h" using android::pdx::BorrowedHandle; using android::pdx::Channel; using android::pdx::ErrorStatus; using android::pdx::Message; using android::pdx::Status; using android::pdx::rpc::DispatchRemoteMethod; namespace android { namespace dvr { ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id, int channel_id, uint64_t consumer_state_bit, const std::shared_ptr<Channel> producer) : BufferHubChannel(service, buffer_id, channel_id, kConsumerType), consumer_state_bit_(consumer_state_bit), producer_(producer) { GetProducer()->AddConsumer(this); } ConsumerChannel::~ConsumerChannel() { ALOGD_IF(TRACE, "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d", channel_id(), buffer_id()); if (auto producer = GetProducer()) { producer->RemoveConsumer(this); } } BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const { BufferHubChannel::BufferInfo info; if (auto producer = GetProducer()) { // If producer has not hung up, copy most buffer info from the producer. info = producer->GetBufferInfo(); } else { info.signaled_mask = consumer_state_bit(); } info.id = buffer_id(); return info; } std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const { return std::static_pointer_cast<ProducerChannel>(producer_.lock()); } void ConsumerChannel::HandleImpulse(Message& message) { ATRACE_NAME("ConsumerChannel::HandleImpulse"); switch (message.GetOp()) { case BufferHubRPC::ConsumerAcquire::Opcode: OnConsumerAcquire(message); break; case BufferHubRPC::ConsumerRelease::Opcode: OnConsumerRelease(message, {}); break; } } bool ConsumerChannel::HandleMessage(Message& message) { ATRACE_NAME("ConsumerChannel::HandleMessage"); auto producer = GetProducer(); if (!producer) REPLY_ERROR_RETURN(message, EPIPE, true); switch (message.GetOp()) { case BufferHubRPC::GetBuffer::Opcode: DispatchRemoteMethod<BufferHubRPC::GetBuffer>( *this, &ConsumerChannel::OnGetBuffer, message); return true; case BufferHubRPC::NewConsumer::Opcode: DispatchRemoteMethod<BufferHubRPC::NewConsumer>( *producer, &ProducerChannel::OnNewConsumer, message); return true; case BufferHubRPC::ConsumerAcquire::Opcode: DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>( *this, &ConsumerChannel::OnConsumerAcquire, message); return true; case BufferHubRPC::ConsumerRelease::Opcode: DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>( *this, &ConsumerChannel::OnConsumerRelease, message); return true; case BufferHubRPC::ConsumerSetIgnore::Opcode: DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>( *this, &ConsumerChannel::OnConsumerSetIgnore, message); return true; default: return false; } } Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer( Message& /*message*/) { ATRACE_NAME("ConsumerChannel::OnGetBuffer"); ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id()); if (auto producer = GetProducer()) { return {producer->GetBuffer(consumer_state_bit_)}; } else { return ErrorStatus(EPIPE); } } Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) { ATRACE_NAME("ConsumerChannel::OnConsumerAcquire"); auto producer = GetProducer(); if (!producer) return ErrorStatus(EPIPE); if (acquired_ || released_) { ALOGE( "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: " "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d", ignored_, acquired_, released_, message.GetChannelId(), producer->buffer_id()); return ErrorStatus(EBUSY); } else { auto status = producer->OnConsumerAcquire(message); if (status) { ClearAvailable(); acquired_ = true; } return status; } } Status<void> ConsumerChannel::OnConsumerRelease(Message& message, LocalFence release_fence) { ATRACE_NAME("ConsumerChannel::OnConsumerRelease"); auto producer = GetProducer(); if (!producer) return ErrorStatus(EPIPE); if (!acquired_ || released_) { ALOGE( "ConsumerChannel::OnConsumerRelease: Release when not acquired: " "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d", ignored_, acquired_, released_, message.GetChannelId(), producer->buffer_id()); return ErrorStatus(EBUSY); } else { auto status = producer->OnConsumerRelease(message, std::move(release_fence)); if (status) { ClearAvailable(); acquired_ = false; released_ = true; } return status; } } Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) { ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore"); auto producer = GetProducer(); if (!producer) return ErrorStatus(EPIPE); ignored_ = ignored; if (ignored_ && acquired_) { // Update the producer if ignore is set after the consumer acquires the // buffer. ClearAvailable(); producer->OnConsumerIgnored(); acquired_ = false; released_ = true; } return {}; } bool ConsumerChannel::OnProducerPosted() { if (ignored_) { acquired_ = false; released_ = true; return false; } else { acquired_ = false; released_ = false; SignalAvailable(); return true; } } void ConsumerChannel::OnProducerClosed() { producer_.reset(); Hangup(); } } // namespace dvr } // namespace android