C++程序  |  204行  |  5.69 KB

#include "consumer_channel.h"

#include <log/log.h>
#include <utils/Trace.h>

#include <thread>

#include <private/dvr/bufferhub_rpc.h>
#include "producer_channel.h"

using android::pdx::BorrowedHandle;
using android::pdx::Channel;
using android::pdx::ErrorStatus;
using android::pdx::Message;
using android::pdx::Status;
using android::pdx::rpc::DispatchRemoteMethod;

namespace android {
namespace dvr {

ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id,
                                 int channel_id, uint64_t consumer_state_bit,
                                 const std::shared_ptr<Channel> producer)
    : BufferHubChannel(service, buffer_id, channel_id, kConsumerType),
      consumer_state_bit_(consumer_state_bit),
      producer_(producer) {
  GetProducer()->AddConsumer(this);
}

ConsumerChannel::~ConsumerChannel() {
  ALOGD_IF(TRACE,
           "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d",
           channel_id(), buffer_id());

  if (auto producer = GetProducer()) {
    producer->RemoveConsumer(this);
  }
}

BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const {
  BufferHubChannel::BufferInfo info;
  if (auto producer = GetProducer()) {
    // If producer has not hung up, copy most buffer info from the producer.
    info = producer->GetBufferInfo();
  } else {
    info.signaled_mask = consumer_state_bit();
  }
  info.id = buffer_id();
  return info;
}

std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const {
  return std::static_pointer_cast<ProducerChannel>(producer_.lock());
}

void ConsumerChannel::HandleImpulse(Message& message) {
  ATRACE_NAME("ConsumerChannel::HandleImpulse");
  switch (message.GetOp()) {
    case BufferHubRPC::ConsumerAcquire::Opcode:
      OnConsumerAcquire(message);
      break;
    case BufferHubRPC::ConsumerRelease::Opcode:
      OnConsumerRelease(message, {});
      break;
  }
}

bool ConsumerChannel::HandleMessage(Message& message) {
  ATRACE_NAME("ConsumerChannel::HandleMessage");
  auto producer = GetProducer();
  if (!producer)
    REPLY_ERROR_RETURN(message, EPIPE, true);

  switch (message.GetOp()) {
    case BufferHubRPC::GetBuffer::Opcode:
      DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
          *this, &ConsumerChannel::OnGetBuffer, message);
      return true;

    case BufferHubRPC::NewConsumer::Opcode:
      DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
          *producer, &ProducerChannel::OnNewConsumer, message);
      return true;

    case BufferHubRPC::ConsumerAcquire::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>(
          *this, &ConsumerChannel::OnConsumerAcquire, message);
      return true;

    case BufferHubRPC::ConsumerRelease::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>(
          *this, &ConsumerChannel::OnConsumerRelease, message);
      return true;

    case BufferHubRPC::ConsumerSetIgnore::Opcode:
      DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>(
          *this, &ConsumerChannel::OnConsumerSetIgnore, message);
      return true;

    default:
      return false;
  }
}

Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer(
    Message& /*message*/) {
  ATRACE_NAME("ConsumerChannel::OnGetBuffer");
  ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id());
  if (auto producer = GetProducer()) {
    return {producer->GetBuffer(consumer_state_bit_)};
  } else {
    return ErrorStatus(EPIPE);
  }
}

Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) {
  ATRACE_NAME("ConsumerChannel::OnConsumerAcquire");
  auto producer = GetProducer();
  if (!producer)
    return ErrorStatus(EPIPE);

  if (acquired_ || released_) {
    ALOGE(
        "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: "
        "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
        ignored_, acquired_, released_, message.GetChannelId(),
        producer->buffer_id());
    return ErrorStatus(EBUSY);
  } else {
    auto status = producer->OnConsumerAcquire(message);
    if (status) {
      ClearAvailable();
      acquired_ = true;
    }
    return status;
  }
}

Status<void> ConsumerChannel::OnConsumerRelease(Message& message,
                                                LocalFence release_fence) {
  ATRACE_NAME("ConsumerChannel::OnConsumerRelease");
  auto producer = GetProducer();
  if (!producer)
    return ErrorStatus(EPIPE);

  if (!acquired_ || released_) {
    ALOGE(
        "ConsumerChannel::OnConsumerRelease: Release when not acquired: "
        "ignored=%d acquired=%d released=%d channel_id=%d buffer_id=%d",
        ignored_, acquired_, released_, message.GetChannelId(),
        producer->buffer_id());
    return ErrorStatus(EBUSY);
  } else {
    auto status =
        producer->OnConsumerRelease(message, std::move(release_fence));
    if (status) {
      ClearAvailable();
      acquired_ = false;
      released_ = true;
    }
    return status;
  }
}

Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) {
  ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore");
  auto producer = GetProducer();
  if (!producer)
    return ErrorStatus(EPIPE);

  ignored_ = ignored;
  if (ignored_ && acquired_) {
    // Update the producer if ignore is set after the consumer acquires the
    // buffer.
    ClearAvailable();
    producer->OnConsumerIgnored();
    acquired_ = false;
    released_ = true;
  }

  return {};
}

bool ConsumerChannel::OnProducerPosted() {
  if (ignored_) {
    acquired_ = false;
    released_ = true;
    return false;
  } else {
    acquired_ = false;
    released_ = false;
    SignalAvailable();
    return true;
  }
}

void ConsumerChannel::OnProducerClosed() {
  producer_.reset();
  Hangup();
}

}  // namespace dvr
}  // namespace android