// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_RX_SCHEDULER_NEW_THREAD_HPP)
#define RXCPP_RX_SCHEDULER_NEW_THREAD_HPP

#include "../rx-includes.hpp"

namespace rxcpp {

namespace schedulers {

typedef std::function<std::thread(std::function<void()>)> thread_factory;

struct new_thread : public scheduler_interface
{
private:
    typedef new_thread this_type;
    new_thread(const this_type&);

    struct new_worker : public worker_interface
    {
    private:
        typedef new_worker this_type;

        typedef detail::action_queue queue_type;

        new_worker(const this_type&);

        struct new_worker_state : public std::enable_shared_from_this<new_worker_state>
        {
            typedef detail::schedulable_queue<
                typename clock_type::time_point> queue_item_time;

            typedef queue_item_time::item_type item_type;

            virtual ~new_worker_state()
            {
            }

            explicit new_worker_state(composite_subscription cs)
                : lifetime(cs)
            {
            }

            composite_subscription lifetime;
            mutable std::mutex lock;
            mutable std::condition_variable wake;
            mutable queue_item_time q;
            std::thread worker;
            recursion r;
        };

        std::shared_ptr<new_worker_state> state;

    public:
        virtual ~new_worker()
        {
        }

        explicit new_worker(std::shared_ptr<new_worker_state> ws)
            : state(ws)
        {
        }

        new_worker(composite_subscription cs, thread_factory& tf)
            : state(std::make_shared<new_worker_state>(cs))
        {
            auto keepAlive = state;

            state->lifetime.add([keepAlive](){
                std::unique_lock<std::mutex> guard(keepAlive->lock);
                auto expired = std::move(keepAlive->q);
                keepAlive->q = new_worker_state::queue_item_time{};
                if (!keepAlive->q.empty()) std::terminate();
                keepAlive->wake.notify_one();

                if (keepAlive->worker.joinable() && keepAlive->worker.get_id() != std::this_thread::get_id()) {
                    guard.unlock();
                    keepAlive->worker.join();
                }
                else {
                    keepAlive->worker.detach();
                }
            });

            state->worker = tf([keepAlive](){

                // take ownership
                queue_type::ensure(std::make_shared<new_worker>(keepAlive));
                // release ownership
                RXCPP_UNWIND_AUTO([]{
                    queue_type::destroy();
                });

                for(;;) {
                    std::unique_lock<std::mutex> guard(keepAlive->lock);
                    if (keepAlive->q.empty()) {
                        keepAlive->wake.wait(guard, [keepAlive](){
                            return !keepAlive->lifetime.is_subscribed() || !keepAlive->q.empty();
                        });
                    }
                    if (!keepAlive->lifetime.is_subscribed()) {
                        break;
                    }
                    auto& peek = keepAlive->q.top();
                    if (!peek.what.is_subscribed()) {
                        keepAlive->q.pop();
                        continue;
                    }
                    if (clock_type::now() < peek.when) {
                        keepAlive->wake.wait_until(guard, peek.when);
                        continue;
                    }
                    auto what = peek.what;
                    keepAlive->q.pop();
                    keepAlive->r.reset(keepAlive->q.empty());
                    guard.unlock();
                    what(keepAlive->r.get_recurse());
                }
            });
        }

        virtual clock_type::time_point now() const {
            return clock_type::now();
        }

        virtual void schedule(const schedulable& scbl) const {
            schedule(now(), scbl);
        }

        virtual void schedule(clock_type::time_point when, const schedulable& scbl) const {
            if (scbl.is_subscribed()) {
                std::unique_lock<std::mutex> guard(state->lock);
                state->q.push(new_worker_state::item_type(when, scbl));
                state->r.reset(false);
            }
            state->wake.notify_one();
        }
    };

    mutable thread_factory factory;

public:
    new_thread()
        : factory([](std::function<void()> start){
            return std::thread(std::move(start));
        })
    {
    }
    explicit new_thread(thread_factory tf)
        : factory(tf)
    {
    }
    virtual ~new_thread()
    {
    }

    virtual clock_type::time_point now() const {
        return clock_type::now();
    }

    virtual worker create_worker(composite_subscription cs) const {
        return worker(cs, std::make_shared<new_worker>(cs, factory));
    }
};

inline scheduler make_new_thread() {
    static scheduler instance = make_scheduler<new_thread>();
    return instance;
}
inline scheduler make_new_thread(thread_factory tf) {
    return make_scheduler<new_thread>(tf);
}

}

}

#endif