/*
 * Copyright (C) 2017 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

//#define LOG_NDEBUG 0
#define LOG_TAG "SimpleC2Component"
#include <log/log.h>

#include <cutils/properties.h>

#include <inttypes.h>

#include <C2Config.h>
#include <C2Debug.h>
#include <C2PlatformSupport.h>
#include <SimpleC2Component.h>

namespace android {

std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
    std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
    mQueue.pop_front();
    return work;
}

void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
    mQueue.push_back({ std::move(work), NO_DRAIN });
}

bool SimpleC2Component::WorkQueue::empty() const {
    return mQueue.empty();
}

void SimpleC2Component::WorkQueue::clear() {
    mQueue.clear();
}

uint32_t SimpleC2Component::WorkQueue::drainMode() const {
    return mQueue.front().drainMode;
}

void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
    mQueue.push_back({ nullptr, drainMode });
}

////////////////////////////////////////////////////////////////////////////////

namespace {

struct DummyReadView : public C2ReadView {
    DummyReadView() : C2ReadView(C2_NO_INIT) {}
};

}  // namespace

SimpleC2Component::SimpleC2Component(
        const std::shared_ptr<C2ComponentInterface> &intf)
    : mDummyReadView(DummyReadView()),
      mIntf(intf) {
}

c2_status_t SimpleC2Component::setListener_vb(
        const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
    Mutexed<ExecState>::Locked state(mExecState);
    if (state->mState == RUNNING) {
        if (listener) {
            return C2_BAD_STATE;
        } else if (!mayBlock) {
            return C2_BLOCKING;
        }
    }
    state->mListener = listener;
    // TODO: wait for listener change to have taken place before returning
    // (e.g. if there is an ongoing listener callback)
    return C2_OK;
}

c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (state->mState != RUNNING) {
            return C2_BAD_STATE;
        }
    }
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        while (!items->empty()) {
            queue->push_back(std::move(items->front()));
            items->pop_front();
        }
        queue->mCondition.broadcast();
    }
    return C2_OK;
}

c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
    (void)items;
    return C2_OMITTED;
}

c2_status_t SimpleC2Component::flush_sm(
        flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
    (void)flushMode;
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (state->mState != RUNNING) {
            return C2_BAD_STATE;
        }
    }
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->incGeneration();
        // TODO: queue->splicedBy(flushedWork, flushedWork->end());
        while (!queue->empty()) {
            std::unique_ptr<C2Work> work = queue->pop_front();
            if (work) {
                flushedWork->push_back(std::move(work));
            }
        }
    }
    {
        Mutexed<PendingWork>::Locked pending(mPendingWork);
        while (!pending->empty()) {
            flushedWork->push_back(std::move(pending->begin()->second));
            pending->erase(pending->begin());
        }
    }

    return C2_OK;
}

c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
    if (drainMode == DRAIN_CHAIN) {
        return C2_OMITTED;
    }
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (state->mState != RUNNING) {
            return C2_BAD_STATE;
        }
    }
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->markDrain(drainMode);
        queue->mCondition.broadcast();
    }

    return C2_OK;
}

c2_status_t SimpleC2Component::start() {
    Mutexed<ExecState>::Locked state(mExecState);
    if (state->mState == RUNNING) {
        return C2_BAD_STATE;
    }
    bool needsInit = (state->mState == UNINITIALIZED);
    if (needsInit) {
        state.unlock();
        c2_status_t err = onInit();
        if (err != C2_OK) {
            return err;
        }
        state.lock();
    }
    if (!state->mThread.joinable()) {
        mExitRequested = false;
        {
            Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
            monitor->mExited = false;
        }
        state->mThread = std::thread(
                [](std::weak_ptr<SimpleC2Component> wp) {
                    while (true) {
                        std::shared_ptr<SimpleC2Component> thiz = wp.lock();
                        if (!thiz) {
                            return;
                        }
                        if (thiz->exitRequested()) {
                            ALOGV("stop processing");
                            thiz->signalExit();
                            return;
                        }
                        thiz->processQueue();
                    }
                },
                shared_from_this());
    }
    state->mState = RUNNING;
    return C2_OK;
}

void SimpleC2Component::signalExit() {
    Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
    monitor->mExited = true;
    monitor->mCondition.broadcast();
}

void SimpleC2Component::requestExitAndWait(std::function<void()> job) {
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (!state->mThread.joinable()) {
            return;
        }
    }
    mExitRequested = true;
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->mCondition.broadcast();
    }
    // TODO: timeout?
    {
        Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
        while (!monitor->mExited) {
            monitor.waitForCondition(monitor->mCondition);
        }
        job();
    }
    Mutexed<ExecState>::Locked state(mExecState);
    if (state->mThread.joinable()) {
        ALOGV("joining the processing thread");
        state->mThread.join();
        ALOGV("joined the processing thread");
    }
}

c2_status_t SimpleC2Component::stop() {
    ALOGV("stop");
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (state->mState != RUNNING) {
            return C2_BAD_STATE;
        }
        state->mState = STOPPED;
    }
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->clear();
    }
    {
        Mutexed<PendingWork>::Locked pending(mPendingWork);
        pending->clear();
    }
    c2_status_t err;
    requestExitAndWait([this, &err]{ err = onStop(); });
    if (err != C2_OK) {
        return err;
    }
    return C2_OK;
}

c2_status_t SimpleC2Component::reset() {
    ALOGV("reset");
    {
        Mutexed<ExecState>::Locked state(mExecState);
        state->mState = UNINITIALIZED;
    }
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->clear();
    }
    {
        Mutexed<PendingWork>::Locked pending(mPendingWork);
        pending->clear();
    }
    requestExitAndWait([this]{ onReset(); });
    return C2_OK;
}

c2_status_t SimpleC2Component::release() {
    ALOGV("release");
    requestExitAndWait([this]{ onRelease(); });
    return C2_OK;
}

std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
    return mIntf;
}

namespace {

std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
    std::list<std::unique_ptr<C2Work>> ret;
    ret.push_back(std::move(work));
    return ret;
}

}  // namespace

void SimpleC2Component::finish(
        uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
    std::unique_ptr<C2Work> work;
    {
        Mutexed<PendingWork>::Locked pending(mPendingWork);
        if (pending->count(frameIndex) == 0) {
            ALOGW("unknown frame index: %" PRIu64, frameIndex);
            return;
        }
        work = std::move(pending->at(frameIndex));
        pending->erase(frameIndex);
    }
    if (work) {
        fillWork(work);
        Mutexed<ExecState>::Locked state(mExecState);
        std::shared_ptr<C2Component::Listener> listener = state->mListener;
        state.unlock();
        listener->onWorkDone_nb(shared_from_this(), vec(work));
        ALOGV("returning pending work");
    }
}

void SimpleC2Component::processQueue() {
    std::unique_ptr<C2Work> work;
    uint64_t generation;
    int32_t drainMode;
    bool isFlushPending = false;
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        nsecs_t deadline = systemTime() + ms2ns(250);
        while (queue->empty()) {
            if (exitRequested()) {
                return;
            }
            nsecs_t now = systemTime();
            if (now >= deadline) {
                return;
            }
            status_t err = queue.waitForConditionRelative(queue->mCondition, deadline - now);
            if (err == TIMED_OUT) {
                return;
            }
        }

        generation = queue->generation();
        drainMode = queue->drainMode();
        isFlushPending = queue->popPendingFlush();
        work = queue->pop_front();
    }
    if (isFlushPending) {
        ALOGV("processing pending flush");
        c2_status_t err = onFlush_sm();
        if (err != C2_OK) {
            ALOGD("flush err: %d", err);
            // TODO: error
        }
    }

    if (!mOutputBlockPool) {
        c2_status_t err = [this] {
            // TODO: don't use query_vb
            C2StreamFormatConfig::output outputFormat(0u);
            std::vector<std::unique_ptr<C2Param>> params;
            c2_status_t err = intf()->query_vb(
                    { &outputFormat },
                    { C2PortBlockPoolsTuning::output::PARAM_TYPE },
                    C2_DONT_BLOCK,
                    &params);
            if (err != C2_OK && err != C2_BAD_INDEX) {
                ALOGD("query err = %d", err);
                return err;
            }
            C2BlockPool::local_id_t poolId =
                outputFormat.value == C2FormatVideo
                        ? C2BlockPool::BASIC_GRAPHIC
                        : C2BlockPool::BASIC_LINEAR;
            if (params.size()) {
                C2PortBlockPoolsTuning::output *outputPools =
                    C2PortBlockPoolsTuning::output::From(params[0].get());
                if (outputPools && outputPools->flexCount() >= 1) {
                    poolId = outputPools->m.values[0];
                }
            }

            err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool);
            ALOGD("Using output block pool with poolID %llu => got %llu - %d",
                    (unsigned long long)poolId,
                    (unsigned long long)(
                            mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111),
                    err);
            return err;
        }();
        if (err != C2_OK) {
            Mutexed<ExecState>::Locked state(mExecState);
            std::shared_ptr<C2Component::Listener> listener = state->mListener;
            state.unlock();
            listener->onError_nb(shared_from_this(), err);
            return;
        }
    }

    if (!work) {
        c2_status_t err = drain(drainMode, mOutputBlockPool);
        if (err != C2_OK) {
            Mutexed<ExecState>::Locked state(mExecState);
            std::shared_ptr<C2Component::Listener> listener = state->mListener;
            state.unlock();
            listener->onError_nb(shared_from_this(), err);
        }
        return;
    }

    process(work, mOutputBlockPool);
    ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        if (queue->generation() != generation) {
            ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
                    queue->generation(), generation);
            work->result = C2_NOT_FOUND;
            queue.unlock();
            {
                Mutexed<ExecState>::Locked state(mExecState);
                std::shared_ptr<C2Component::Listener> listener = state->mListener;
                state.unlock();
                listener->onWorkDone_nb(shared_from_this(), vec(work));
            }
            queue.lock();
            return;
        }
    }
    if (work->workletsProcessed != 0u) {
        Mutexed<ExecState>::Locked state(mExecState);
        ALOGV("returning this work");
        std::shared_ptr<C2Component::Listener> listener = state->mListener;
        state.unlock();
        listener->onWorkDone_nb(shared_from_this(), vec(work));
    } else {
        ALOGV("queue pending work");
        std::unique_ptr<C2Work> unexpected;
        {
            Mutexed<PendingWork>::Locked pending(mPendingWork);
            uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
            if (pending->count(frameIndex) != 0) {
                unexpected = std::move(pending->at(frameIndex));
                pending->erase(frameIndex);
            }
            (void)pending->insert({ frameIndex, std::move(work) });
        }
        if (unexpected) {
            ALOGD("unexpected pending work");
            unexpected->result = C2_CORRUPTED;
            Mutexed<ExecState>::Locked state(mExecState);
            std::shared_ptr<C2Component::Listener> listener = state->mListener;
            state.unlock();
            listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
        }
    }
}

std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
        const std::shared_ptr<C2LinearBlock> &block) {
    return createLinearBuffer(block, block->offset(), block->size());
}

std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
        const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
    return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
}

std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
        const std::shared_ptr<C2GraphicBlock> &block) {
    return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
}

std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
        const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
    return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
}

} // namespace android