/* * Copyright (C) 2017 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "SimpleC2Component" #include <log/log.h> #include <cutils/properties.h> #include <inttypes.h> #include <C2Config.h> #include <C2Debug.h> #include <C2PlatformSupport.h> #include <SimpleC2Component.h> namespace android { std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() { std::unique_ptr<C2Work> work = std::move(mQueue.front().work); mQueue.pop_front(); return work; } void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) { mQueue.push_back({ std::move(work), NO_DRAIN }); } bool SimpleC2Component::WorkQueue::empty() const { return mQueue.empty(); } void SimpleC2Component::WorkQueue::clear() { mQueue.clear(); } uint32_t SimpleC2Component::WorkQueue::drainMode() const { return mQueue.front().drainMode; } void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) { mQueue.push_back({ nullptr, drainMode }); } //////////////////////////////////////////////////////////////////////////////// namespace { struct DummyReadView : public C2ReadView { DummyReadView() : C2ReadView(C2_NO_INIT) {} }; } // namespace SimpleC2Component::SimpleC2Component( const std::shared_ptr<C2ComponentInterface> &intf) : mDummyReadView(DummyReadView()), mIntf(intf) { } c2_status_t SimpleC2Component::setListener_vb( const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) { Mutexed<ExecState>::Locked state(mExecState); if (state->mState == RUNNING) { if (listener) { return C2_BAD_STATE; } else if (!mayBlock) { return C2_BLOCKING; } } state->mListener = listener; // TODO: wait for listener change to have taken place before returning // (e.g. if there is an ongoing listener callback) return C2_OK; } c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) { { Mutexed<ExecState>::Locked state(mExecState); if (state->mState != RUNNING) { return C2_BAD_STATE; } } { Mutexed<WorkQueue>::Locked queue(mWorkQueue); while (!items->empty()) { queue->push_back(std::move(items->front())); items->pop_front(); } queue->mCondition.broadcast(); } return C2_OK; } c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) { (void)items; return C2_OMITTED; } c2_status_t SimpleC2Component::flush_sm( flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) { (void)flushMode; { Mutexed<ExecState>::Locked state(mExecState); if (state->mState != RUNNING) { return C2_BAD_STATE; } } { Mutexed<WorkQueue>::Locked queue(mWorkQueue); queue->incGeneration(); // TODO: queue->splicedBy(flushedWork, flushedWork->end()); while (!queue->empty()) { std::unique_ptr<C2Work> work = queue->pop_front(); if (work) { flushedWork->push_back(std::move(work)); } } } { Mutexed<PendingWork>::Locked pending(mPendingWork); while (!pending->empty()) { flushedWork->push_back(std::move(pending->begin()->second)); pending->erase(pending->begin()); } } return C2_OK; } c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) { if (drainMode == DRAIN_CHAIN) { return C2_OMITTED; } { Mutexed<ExecState>::Locked state(mExecState); if (state->mState != RUNNING) { return C2_BAD_STATE; } } { Mutexed<WorkQueue>::Locked queue(mWorkQueue); queue->markDrain(drainMode); queue->mCondition.broadcast(); } return C2_OK; } c2_status_t SimpleC2Component::start() { Mutexed<ExecState>::Locked state(mExecState); if (state->mState == RUNNING) { return C2_BAD_STATE; } bool needsInit = (state->mState == UNINITIALIZED); if (needsInit) { state.unlock(); c2_status_t err = onInit(); if (err != C2_OK) { return err; } state.lock(); } if (!state->mThread.joinable()) { mExitRequested = false; { Mutexed<ExitMonitor>::Locked monitor(mExitMonitor); monitor->mExited = false; } state->mThread = std::thread( [](std::weak_ptr<SimpleC2Component> wp) { while (true) { std::shared_ptr<SimpleC2Component> thiz = wp.lock(); if (!thiz) { return; } if (thiz->exitRequested()) { ALOGV("stop processing"); thiz->signalExit(); return; } thiz->processQueue(); } }, shared_from_this()); } state->mState = RUNNING; return C2_OK; } void SimpleC2Component::signalExit() { Mutexed<ExitMonitor>::Locked monitor(mExitMonitor); monitor->mExited = true; monitor->mCondition.broadcast(); } void SimpleC2Component::requestExitAndWait(std::function<void()> job) { { Mutexed<ExecState>::Locked state(mExecState); if (!state->mThread.joinable()) { return; } } mExitRequested = true; { Mutexed<WorkQueue>::Locked queue(mWorkQueue); queue->mCondition.broadcast(); } // TODO: timeout? { Mutexed<ExitMonitor>::Locked monitor(mExitMonitor); while (!monitor->mExited) { monitor.waitForCondition(monitor->mCondition); } job(); } Mutexed<ExecState>::Locked state(mExecState); if (state->mThread.joinable()) { ALOGV("joining the processing thread"); state->mThread.join(); ALOGV("joined the processing thread"); } } c2_status_t SimpleC2Component::stop() { ALOGV("stop"); { Mutexed<ExecState>::Locked state(mExecState); if (state->mState != RUNNING) { return C2_BAD_STATE; } state->mState = STOPPED; } { Mutexed<WorkQueue>::Locked queue(mWorkQueue); queue->clear(); } { Mutexed<PendingWork>::Locked pending(mPendingWork); pending->clear(); } c2_status_t err; requestExitAndWait([this, &err]{ err = onStop(); }); if (err != C2_OK) { return err; } return C2_OK; } c2_status_t SimpleC2Component::reset() { ALOGV("reset"); { Mutexed<ExecState>::Locked state(mExecState); state->mState = UNINITIALIZED; } { Mutexed<WorkQueue>::Locked queue(mWorkQueue); queue->clear(); } { Mutexed<PendingWork>::Locked pending(mPendingWork); pending->clear(); } requestExitAndWait([this]{ onReset(); }); return C2_OK; } c2_status_t SimpleC2Component::release() { ALOGV("release"); requestExitAndWait([this]{ onRelease(); }); return C2_OK; } std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() { return mIntf; } namespace { std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) { std::list<std::unique_ptr<C2Work>> ret; ret.push_back(std::move(work)); return ret; } } // namespace void SimpleC2Component::finish( uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) { std::unique_ptr<C2Work> work; { Mutexed<PendingWork>::Locked pending(mPendingWork); if (pending->count(frameIndex) == 0) { ALOGW("unknown frame index: %" PRIu64, frameIndex); return; } work = std::move(pending->at(frameIndex)); pending->erase(frameIndex); } if (work) { fillWork(work); Mutexed<ExecState>::Locked state(mExecState); std::shared_ptr<C2Component::Listener> listener = state->mListener; state.unlock(); listener->onWorkDone_nb(shared_from_this(), vec(work)); ALOGV("returning pending work"); } } void SimpleC2Component::processQueue() { std::unique_ptr<C2Work> work; uint64_t generation; int32_t drainMode; bool isFlushPending = false; { Mutexed<WorkQueue>::Locked queue(mWorkQueue); nsecs_t deadline = systemTime() + ms2ns(250); while (queue->empty()) { if (exitRequested()) { return; } nsecs_t now = systemTime(); if (now >= deadline) { return; } status_t err = queue.waitForConditionRelative(queue->mCondition, deadline - now); if (err == TIMED_OUT) { return; } } generation = queue->generation(); drainMode = queue->drainMode(); isFlushPending = queue->popPendingFlush(); work = queue->pop_front(); } if (isFlushPending) { ALOGV("processing pending flush"); c2_status_t err = onFlush_sm(); if (err != C2_OK) { ALOGD("flush err: %d", err); // TODO: error } } if (!mOutputBlockPool) { c2_status_t err = [this] { // TODO: don't use query_vb C2StreamFormatConfig::output outputFormat(0u); std::vector<std::unique_ptr<C2Param>> params; c2_status_t err = intf()->query_vb( { &outputFormat }, { C2PortBlockPoolsTuning::output::PARAM_TYPE }, C2_DONT_BLOCK, ¶ms); if (err != C2_OK && err != C2_BAD_INDEX) { ALOGD("query err = %d", err); return err; } C2BlockPool::local_id_t poolId = outputFormat.value == C2FormatVideo ? C2BlockPool::BASIC_GRAPHIC : C2BlockPool::BASIC_LINEAR; if (params.size()) { C2PortBlockPoolsTuning::output *outputPools = C2PortBlockPoolsTuning::output::From(params[0].get()); if (outputPools && outputPools->flexCount() >= 1) { poolId = outputPools->m.values[0]; } } err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool); ALOGD("Using output block pool with poolID %llu => got %llu - %d", (unsigned long long)poolId, (unsigned long long)( mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111), err); return err; }(); if (err != C2_OK) { Mutexed<ExecState>::Locked state(mExecState); std::shared_ptr<C2Component::Listener> listener = state->mListener; state.unlock(); listener->onError_nb(shared_from_this(), err); return; } } if (!work) { c2_status_t err = drain(drainMode, mOutputBlockPool); if (err != C2_OK) { Mutexed<ExecState>::Locked state(mExecState); std::shared_ptr<C2Component::Listener> listener = state->mListener; state.unlock(); listener->onError_nb(shared_from_this(), err); } return; } process(work, mOutputBlockPool); ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku()); { Mutexed<WorkQueue>::Locked queue(mWorkQueue); if (queue->generation() != generation) { ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64, queue->generation(), generation); work->result = C2_NOT_FOUND; queue.unlock(); { Mutexed<ExecState>::Locked state(mExecState); std::shared_ptr<C2Component::Listener> listener = state->mListener; state.unlock(); listener->onWorkDone_nb(shared_from_this(), vec(work)); } queue.lock(); return; } } if (work->workletsProcessed != 0u) { Mutexed<ExecState>::Locked state(mExecState); ALOGV("returning this work"); std::shared_ptr<C2Component::Listener> listener = state->mListener; state.unlock(); listener->onWorkDone_nb(shared_from_this(), vec(work)); } else { ALOGV("queue pending work"); std::unique_ptr<C2Work> unexpected; { Mutexed<PendingWork>::Locked pending(mPendingWork); uint64_t frameIndex = work->input.ordinal.frameIndex.peeku(); if (pending->count(frameIndex) != 0) { unexpected = std::move(pending->at(frameIndex)); pending->erase(frameIndex); } (void)pending->insert({ frameIndex, std::move(work) }); } if (unexpected) { ALOGD("unexpected pending work"); unexpected->result = C2_CORRUPTED; Mutexed<ExecState>::Locked state(mExecState); std::shared_ptr<C2Component::Listener> listener = state->mListener; state.unlock(); listener->onWorkDone_nb(shared_from_this(), vec(unexpected)); } } } std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer( const std::shared_ptr<C2LinearBlock> &block) { return createLinearBuffer(block, block->offset(), block->size()); } std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer( const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) { return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence())); } std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer( const std::shared_ptr<C2GraphicBlock> &block) { return createGraphicBuffer(block, C2Rect(block->width(), block->height())); } std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer( const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) { return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence())); } } // namespace android