/*
* Copyright (C) 2009 The Android Open Source Project
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//#define LOG_NDEBUG 0
#define LOG_TAG "BpMediaSource"
#include <utils/Log.h>
#include <inttypes.h>
#include <stdint.h>
#include <sys/types.h>
#include <binder/Parcel.h>
#include <media/IMediaSource.h>
#include <media/stagefright/MediaBuffer.h>
#include <media/stagefright/MediaBufferGroup.h>
#include <media/stagefright/MediaSource.h>
#include <media/stagefright/MetaData.h>
namespace android {
enum {
START = IBinder::FIRST_CALL_TRANSACTION,
STOP,
PAUSE,
GETFORMAT,
// READ, deprecated
READMULTIPLE,
RELEASE_BUFFER,
SUPPORT_NONBLOCKING_READ,
};
enum {
NULL_BUFFER,
SHARED_BUFFER,
INLINE_BUFFER,
SHARED_BUFFER_INDEX,
};
class RemoteMediaBufferWrapper : public MediaBuffer {
public:
RemoteMediaBufferWrapper(const sp<IMemory> &mem)
: MediaBuffer(mem) {
ALOGV("RemoteMediaBufferWrapper: creating %p", this);
}
protected:
virtual ~RemoteMediaBufferWrapper() {
// Release our interest in the MediaBuffer's shared memory.
int32_t old = addRemoteRefcount(-1);
ALOGV("RemoteMediaBufferWrapper: releasing %p, refcount %d", this, old - 1);
mMemory.clear(); // don't set the dead object flag.
}
};
class BpMediaSource : public BpInterface<IMediaSource> {
public:
explicit BpMediaSource(const sp<IBinder>& impl)
: BpInterface<IMediaSource>(impl), mBuffersSinceStop(0)
{
}
virtual status_t start(MetaData *params) {
ALOGV("start");
Parcel data, reply;
data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
if (params) {
params->writeToParcel(data);
}
status_t ret = remote()->transact(START, data, &reply);
if (ret == NO_ERROR && params) {
ALOGW("ignoring potentially modified MetaData from start");
ALOGW("input:");
params->dumpToLog();
sp<MetaData> meta = MetaData::createFromParcel(reply);
ALOGW("output:");
meta->dumpToLog();
}
return ret;
}
virtual status_t stop() {
ALOGV("stop");
Parcel data, reply;
data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
status_t status = remote()->transact(STOP, data, &reply);
mMemoryCache.reset();
mBuffersSinceStop = 0;
return status;
}
virtual sp<MetaData> getFormat() {
ALOGV("getFormat");
Parcel data, reply;
data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
status_t ret = remote()->transact(GETFORMAT, data, &reply);
if (ret == NO_ERROR) {
mMetaData = MetaData::createFromParcel(reply);
return mMetaData;
}
return NULL;
}
virtual status_t read(MediaBuffer **buffer, const ReadOptions *options) {
Vector<MediaBuffer *> buffers;
status_t ret = readMultiple(&buffers, 1 /* maxNumBuffers */, options);
*buffer = buffers.size() == 0 ? nullptr : buffers[0];
ALOGV("read status %d, bufferCount %u, sinceStop %u",
ret, *buffer != nullptr, mBuffersSinceStop);
return ret;
}
virtual status_t readMultiple(
Vector<MediaBuffer *> *buffers, uint32_t maxNumBuffers, const ReadOptions *options) {
ALOGV("readMultiple");
if (buffers == NULL || !buffers->isEmpty()) {
return BAD_VALUE;
}
Parcel data, reply;
data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
data.writeUint32(maxNumBuffers);
if (options != nullptr) {
data.writeByteArray(sizeof(*options), (uint8_t*) options);
}
status_t ret = remote()->transact(READMULTIPLE, data, &reply);
mMemoryCache.gc();
if (ret != NO_ERROR) {
return ret;
}
// wrap the returned data in a vector of MediaBuffers
int32_t buftype;
uint32_t bufferCount = 0;
while ((buftype = reply.readInt32()) != NULL_BUFFER) {
LOG_ALWAYS_FATAL_IF(bufferCount >= maxNumBuffers,
"Received %u+ buffers and requested %u buffers",
bufferCount + 1, maxNumBuffers);
MediaBuffer *buf;
if (buftype == SHARED_BUFFER || buftype == SHARED_BUFFER_INDEX) {
uint64_t index = reply.readUint64();
ALOGV("Received %s index %llu",
buftype == SHARED_BUFFER ? "SHARED_BUFFER" : "SHARED_BUFFER_INDEX",
(unsigned long long) index);
sp<IMemory> mem;
if (buftype == SHARED_BUFFER) {
sp<IBinder> binder = reply.readStrongBinder();
mem = interface_cast<IMemory>(binder);
LOG_ALWAYS_FATAL_IF(mem.get() == nullptr,
"Received NULL IMemory for shared buffer");
mMemoryCache.insert(index, mem);
} else {
mem = mMemoryCache.lookup(index);
LOG_ALWAYS_FATAL_IF(mem.get() == nullptr,
"Received invalid IMemory index for shared buffer: %llu",
(unsigned long long)index);
}
size_t offset = reply.readInt32();
size_t length = reply.readInt32();
buf = new RemoteMediaBufferWrapper(mem);
buf->set_range(offset, length);
buf->meta_data()->updateFromParcel(reply);
} else { // INLINE_BUFFER
int32_t len = reply.readInt32();
ALOGV("INLINE_BUFFER status %d and len %d", ret, len);
buf = new MediaBuffer(len);
reply.read(buf->data(), len);
buf->meta_data()->updateFromParcel(reply);
}
buffers->push_back(buf);
++bufferCount;
++mBuffersSinceStop;
}
ret = reply.readInt32();
ALOGV("readMultiple status %d, bufferCount %u, sinceStop %u",
ret, bufferCount, mBuffersSinceStop);
return ret;
}
// Binder proxy adds readMultiple support.
virtual bool supportReadMultiple() {
return true;
}
virtual bool supportNonblockingRead() {
ALOGV("supportNonblockingRead");
Parcel data, reply;
data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
status_t ret = remote()->transact(SUPPORT_NONBLOCKING_READ, data, &reply);
if (ret == NO_ERROR) {
return reply.readInt32() != 0;
}
return false;
}
virtual status_t pause() {
ALOGV("pause");
Parcel data, reply;
data.writeInterfaceToken(BpMediaSource::getInterfaceDescriptor());
return remote()->transact(PAUSE, data, &reply);
}
virtual status_t setBuffers(const Vector<MediaBuffer *> & buffers __unused) {
ALOGV("setBuffers NOT IMPLEMENTED");
return ERROR_UNSUPPORTED; // default
}
private:
uint32_t mBuffersSinceStop; // Buffer tracking variable
// NuPlayer passes pointers-to-metadata around, so we use this to keep the metadata alive
// XXX: could we use this for caching, or does metadata change on the fly?
sp<MetaData> mMetaData;
// Cache all IMemory objects received from MediaExtractor.
// We gc IMemory objects that are no longer active (referenced by a MediaBuffer).
struct MemoryCache {
sp<IMemory> lookup(uint64_t index) {
auto p = mIndexToMemory.find(index);
if (p == mIndexToMemory.end()) {
ALOGE("cannot find index!");
return nullptr;
}
return p->second;
}
void insert(uint64_t index, const sp<IMemory> &mem) {
if (mIndexToMemory.find(index) != mIndexToMemory.end()) {
ALOGE("index %llu already present", (unsigned long long)index);
return;
}
(void)mIndexToMemory.emplace(index, mem);
}
void reset() {
mIndexToMemory.clear();
}
void gc() {
for (auto it = mIndexToMemory.begin(); it != mIndexToMemory.end(); ) {
if (MediaBuffer::isDeadObject(it->second)) {
it = mIndexToMemory.erase(it);
} else {
++it;
}
}
}
private:
// C++14 unordered_map erase on iterator is stable; C++11 has no guarantee.
std::map<uint64_t, sp<IMemory>> mIndexToMemory;
} mMemoryCache;
};
IMPLEMENT_META_INTERFACE(MediaSource, "android.media.IMediaSource");
#undef LOG_TAG
#define LOG_TAG "BnMediaSource"
BnMediaSource::BnMediaSource()
: mBuffersSinceStop(0)
, mGroup(new MediaBufferGroup(kBinderMediaBuffers /* growthLimit */)) {
}
BnMediaSource::~BnMediaSource() {
}
status_t BnMediaSource::onTransact(
uint32_t code, const Parcel& data, Parcel* reply, uint32_t flags)
{
switch (code) {
case START: {
ALOGV("start");
CHECK_INTERFACE(IMediaSource, data, reply);
sp<MetaData> meta;
if (data.dataAvail()) {
meta = MetaData::createFromParcel(data);
}
status_t ret = start(meta.get());
if (ret == NO_ERROR && meta != NULL) {
meta->writeToParcel(*reply);
}
return ret;
}
case STOP: {
ALOGV("stop");
CHECK_INTERFACE(IMediaSource, data, reply);
mGroup->signalBufferReturned(nullptr);
status_t status = stop();
mIndexCache.reset();
mBuffersSinceStop = 0;
return status;
}
case PAUSE: {
ALOGV("pause");
CHECK_INTERFACE(IMediaSource, data, reply);
mGroup->signalBufferReturned(nullptr);
return pause();
}
case GETFORMAT: {
ALOGV("getFormat");
CHECK_INTERFACE(IMediaSource, data, reply);
sp<MetaData> meta = getFormat();
if (meta != NULL) {
meta->writeToParcel(*reply);
return NO_ERROR;
}
return UNKNOWN_ERROR;
}
case READMULTIPLE: {
ALOGV("readMultiple");
CHECK_INTERFACE(IMediaSource, data, reply);
// Get max number of buffers to read.
uint32_t maxNumBuffers;
data.readUint32(&maxNumBuffers);
if (maxNumBuffers > kMaxNumReadMultiple) {
maxNumBuffers = kMaxNumReadMultiple;
}
// Get read options, if any.
ReadOptions opts;
uint32_t len;
const bool useOptions =
data.readUint32(&len) == NO_ERROR
&& len == sizeof(opts)
&& data.read((void *)&opts, len) == NO_ERROR;
mGroup->signalBufferReturned(nullptr);
mIndexCache.gc();
size_t inlineTransferSize = 0;
status_t ret = NO_ERROR;
uint32_t bufferCount = 0;
for (; bufferCount < maxNumBuffers; ++bufferCount, ++mBuffersSinceStop) {
MediaBuffer *buf = nullptr;
ret = read(&buf, useOptions ? &opts : nullptr);
opts.clearNonPersistent(); // Remove options that only apply to first buffer.
if (ret != NO_ERROR || buf == nullptr) {
break;
}
// Even if we're using shared memory, we might not want to use it, since for small
// sizes it's faster to copy data through the Binder transaction
// On the other hand, if the data size is large enough, it's better to use shared
// memory. When data is too large, binder can't handle it.
//
// TODO: reduce MediaBuffer::kSharedMemThreshold
MediaBuffer *transferBuf = nullptr;
const size_t length = buf->range_length();
size_t offset = buf->range_offset();
if (length >= (supportNonblockingRead() && buf->mMemory != nullptr ?
kTransferSharedAsSharedThreshold : kTransferInlineAsSharedThreshold)) {
if (buf->mMemory != nullptr) {
ALOGV("Use shared memory: %zu", length);
transferBuf = buf;
} else {
ALOGD("Large buffer %zu without IMemory!", length);
ret = mGroup->acquire_buffer(
&transferBuf, false /* nonBlocking */, length);
if (ret != OK
|| transferBuf == nullptr
|| transferBuf->mMemory == nullptr) {
ALOGW("Failed to acquire shared memory, size %zu, ret %d",
length, ret);
if (transferBuf != nullptr) {
transferBuf->release();
transferBuf = nullptr;
}
// Current buffer transmit inline; no more additional buffers.
maxNumBuffers = 0;
} else {
memcpy(transferBuf->data(), (uint8_t*)buf->data() + offset, length);
offset = 0;
if (!mGroup->has_buffers()) {
maxNumBuffers = 0; // No more MediaBuffers, stop readMultiple.
}
}
}
}
if (transferBuf != nullptr) { // Using shared buffers.
if (!transferBuf->isObserved() && transferBuf != buf) {
// Transfer buffer must be part of a MediaBufferGroup.
ALOGV("adding shared memory buffer %p to local group", transferBuf);
mGroup->add_buffer(transferBuf);
transferBuf->add_ref(); // We have already acquired buffer.
}
uint64_t index = mIndexCache.lookup(transferBuf->mMemory);
if (index == 0) {
index = mIndexCache.insert(transferBuf->mMemory);
reply->writeInt32(SHARED_BUFFER);
reply->writeUint64(index);
reply->writeStrongBinder(IInterface::asBinder(transferBuf->mMemory));
ALOGV("SHARED_BUFFER(%p) %llu",
transferBuf, (unsigned long long)index);
} else {
reply->writeInt32(SHARED_BUFFER_INDEX);
reply->writeUint64(index);
ALOGV("SHARED_BUFFER_INDEX(%p) %llu",
transferBuf, (unsigned long long)index);
}
reply->writeInt32(offset);
reply->writeInt32(length);
buf->meta_data()->writeToParcel(*reply);
transferBuf->addRemoteRefcount(1);
if (transferBuf != buf) {
transferBuf->release(); // release local ref
} else if (!supportNonblockingRead()) {
maxNumBuffers = 0; // stop readMultiple with one shared buffer.
}
} else {
ALOGV_IF(buf->mMemory != nullptr,
"INLINE(%p) %zu shared mem available, but only %zu used",
buf, buf->mMemory->size(), length);
reply->writeInt32(INLINE_BUFFER);
reply->writeByteArray(length, (uint8_t*)buf->data() + offset);
buf->meta_data()->writeToParcel(*reply);
inlineTransferSize += length;
if (inlineTransferSize > kInlineMaxTransfer) {
maxNumBuffers = 0; // stop readMultiple if inline transfer is too large.
}
}
buf->release();
}
reply->writeInt32(NULL_BUFFER); // Indicate no more MediaBuffers.
reply->writeInt32(ret);
ALOGV("readMultiple status %d, bufferCount %u, sinceStop %u",
ret, bufferCount, mBuffersSinceStop);
return NO_ERROR;
}
case SUPPORT_NONBLOCKING_READ: {
ALOGV("supportNonblockingRead");
CHECK_INTERFACE(IMediaSource, data, reply);
reply->writeInt32((int32_t)supportNonblockingRead());
return NO_ERROR;
}
default:
return BBinder::onTransact(code, data, reply, flags);
}
}
////////////////////////////////////////////////////////////////////////////////
IMediaSource::ReadOptions::ReadOptions() {
reset();
}
void IMediaSource::ReadOptions::reset() {
mOptions = 0;
mSeekTimeUs = 0;
mLatenessUs = 0;
mNonBlocking = false;
}
void IMediaSource::ReadOptions::setNonBlocking() {
mNonBlocking = true;
}
void IMediaSource::ReadOptions::clearNonBlocking() {
mNonBlocking = false;
}
bool IMediaSource::ReadOptions::getNonBlocking() const {
return mNonBlocking;
}
void IMediaSource::ReadOptions::setSeekTo(int64_t time_us, SeekMode mode) {
mOptions |= kSeekTo_Option;
mSeekTimeUs = time_us;
mSeekMode = mode;
}
void IMediaSource::ReadOptions::clearSeekTo() {
mOptions &= ~kSeekTo_Option;
mSeekTimeUs = 0;
mSeekMode = SEEK_CLOSEST_SYNC;
}
bool IMediaSource::ReadOptions::getSeekTo(
int64_t *time_us, SeekMode *mode) const {
*time_us = mSeekTimeUs;
*mode = mSeekMode;
return (mOptions & kSeekTo_Option) != 0;
}
void IMediaSource::ReadOptions::setLateBy(int64_t lateness_us) {
mLatenessUs = lateness_us;
}
int64_t IMediaSource::ReadOptions::getLateBy() const {
return mLatenessUs;
}
} // namespace android