// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_RX_SCHEDULER_HPP)
#define RXCPP_RX_SCHEDULER_HPP

#include "rx-includes.hpp"

namespace rxcpp {

namespace schedulers {

class worker_interface;
class scheduler_interface;

namespace detail {

class action_type;
typedef std::shared_ptr<action_type> action_ptr;

typedef std::shared_ptr<worker_interface> worker_interface_ptr;
typedef std::shared_ptr<const worker_interface> const_worker_interface_ptr;

typedef std::weak_ptr<worker_interface> worker_interface_weak_ptr;
typedef std::weak_ptr<const worker_interface> const_worker_interface_weak_ptr;

typedef std::shared_ptr<scheduler_interface> scheduler_interface_ptr;
typedef std::shared_ptr<const scheduler_interface> const_scheduler_interface_ptr;

inline action_ptr shared_empty() {
    static action_ptr shared_empty = std::make_shared<detail::action_type>();
    return shared_empty;
}

}

// It is essential to keep virtual function calls out of an inner loop.
// To make tail-recursion work efficiently the recursion objects create
// a space on the stack inside the virtual function call in the actor that
// allows the callback and the scheduler to share stack space that records
// the request and the allowance without any virtual calls in the loop.

/// recursed is set on a schedulable by the action to allow the called
/// function to request to be rescheduled.
class recursed
{
    bool& isrequested;
    recursed operator=(const recursed&);
public:
    explicit recursed(bool& r)
        : isrequested(r)
    {
    }
    /// request to be rescheduled
    inline void operator()() const {
        isrequested = true;
    }
};

/// recurse is passed to the action by the scheduler.
/// the action uses recurse to coordinate the scheduler and the function.
class recurse
{
    bool& isallowed;
    mutable bool isrequested;
    recursed requestor;
    recurse operator=(const recurse&);
public:
    explicit recurse(bool& a)
        : isallowed(a)
        , isrequested(true)
        , requestor(isrequested)
    {
    }
    /// does the scheduler allow tail-recursion now?
    inline bool is_allowed() const {
        return isallowed;
    }
    /// did the function request to be recursed?
    inline bool is_requested() const {
        return isrequested;
    }
    /// reset the function request. call before each call to the function.
    inline void reset() const {
        isrequested = false;
    }
    /// get the recursed to set into the schedulable for the function to use to request recursion
    inline const recursed& get_recursed() const {
        return requestor;
    }
};

/// recursion is used by the scheduler to signal to each action whether tail recursion is allowed.
class recursion
{
    mutable bool isallowed;
    recurse recursor;
    recursion operator=(const recursion&);
public:
    recursion()
        : isallowed(true)
        , recursor(isallowed)
    {
    }
    explicit recursion(bool b)
        : isallowed(b)
        , recursor(isallowed)
    {
    }
    /// set whether tail-recursion is allowed
    inline void reset(bool b = true) const {
        isallowed = b;
    }
    /// get the recurse to pass into each action being called
    inline const recurse& get_recurse() const {
        return recursor;
    }
};


struct action_base
{
    typedef tag_action action_tag;
};

class schedulable;

/// action provides type-forgetting for a potentially recursive set of calls to a function that takes a schedulable
class action : public action_base
{
    typedef action this_type;
    detail::action_ptr inner;
public:
    action()
    {
    }
    explicit action(detail::action_ptr i)
    : inner(std::move(i))
    {
    }

    /// return the empty action
    inline static action empty() {
        return action(detail::shared_empty());
    }

    /// call the function
    inline void operator()(const schedulable& s, const recurse& r) const;
};

struct scheduler_base
{
    typedef std::chrono::steady_clock clock_type;
    typedef tag_scheduler scheduler_tag;
};

struct worker_base : public subscription_base
{
    typedef tag_worker worker_tag;
};

class worker_interface
    : public std::enable_shared_from_this<worker_interface>
{
    typedef worker_interface this_type;

public:
    typedef scheduler_base::clock_type clock_type;

    virtual ~worker_interface() {}

    virtual clock_type::time_point now() const = 0;

    virtual void schedule(const schedulable& scbl) const = 0;
    virtual void schedule(clock_type::time_point when, const schedulable& scbl) const = 0;
};

namespace detail {

template<class F>
struct is_action_function
{
    struct not_void {};
    template<class CF>
    static auto check(int) -> decltype((*(CF*)nullptr)(*(schedulable*)nullptr));
    template<class CF>
    static not_void check(...);

    static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};

}

class weak_worker;

/// a worker ensures that all scheduled actions on the same instance are executed in-order with no overlap
/// a worker ensures that all scheduled actions are unsubscribed when it is unsubscribed
/// some inner implementations will impose additional constraints on the execution of items.
class worker : public worker_base
{
    typedef worker this_type;
    detail::worker_interface_ptr inner;
    composite_subscription lifetime;
    friend bool operator==(const worker&, const worker&);
    friend class weak_worker;
public:
    typedef scheduler_base::clock_type clock_type;
    typedef composite_subscription::weak_subscription weak_subscription;

    worker()
    {
    }
    worker(composite_subscription cs, detail::const_worker_interface_ptr i)
        : inner(std::const_pointer_cast<worker_interface>(i))
        , lifetime(std::move(cs))
    {
    }
    worker(composite_subscription cs, worker o)
        : inner(o.inner)
        , lifetime(std::move(cs))
    {
    }

    inline const composite_subscription& get_subscription() const {
        return lifetime;
    }
    inline composite_subscription& get_subscription() {
        return lifetime;
    }

    // composite_subscription
    //
    inline bool is_subscribed() const {
        return lifetime.is_subscribed();
    }
    inline weak_subscription add(subscription s) const {
        return lifetime.add(std::move(s));
    }
    inline void remove(weak_subscription w) const {
        return lifetime.remove(std::move(w));
    }
    inline void clear() const {
        return lifetime.clear();
    }
    inline void unsubscribe() const {
        return lifetime.unsubscribe();
    }

    // worker_interface
    //
    /// return the current time for this worker
    inline clock_type::time_point now() const {
        return inner->now();
    }

    /// insert the supplied schedulable to be run as soon as possible
    inline void schedule(const schedulable& scbl) const {
        // force rebinding scbl to this worker
        schedule_rebind(scbl);
    }

    /// insert the supplied schedulable to be run at the time specified
    inline void schedule(clock_type::time_point when, const schedulable& scbl) const {
        // force rebinding scbl to this worker
        schedule_rebind(when, scbl);
    }

    // helpers
    //

    /// insert the supplied schedulable to be run at now() + the delay specified
    inline void schedule(clock_type::duration when, const schedulable& scbl) const {
        // force rebinding scbl to this worker
        schedule_rebind(now() + when, scbl);
    }

    /// insert the supplied schedulable to be run at the initial time specified and then again at initial + (N * period)
    /// this will continue until the worker or schedulable is unsubscribed.
    inline void schedule_periodically(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl) const {
        // force rebinding scbl to this worker
        schedule_periodically_rebind(initial, period, scbl);
    }

    /// insert the supplied schedulable to be run at now() + the initial delay specified and then again at now() + initial + (N * period)
    /// this will continue until the worker or schedulable is unsubscribed.
    inline void schedule_periodically(clock_type::duration initial, clock_type::duration period, const schedulable& scbl) const {
        // force rebinding scbl to this worker
        schedule_periodically_rebind(now() + initial, period, scbl);
    }

    /// use the supplied arguments to make a schedulable and then insert it to be run
    template<class Arg0, class... ArgN>
    auto schedule(Arg0&& a0, ArgN&&... an) const
        -> typename std::enable_if<
            (detail::is_action_function<Arg0>::value ||
            is_subscription<Arg0>::value) &&
            !is_schedulable<Arg0>::value>::type;
    template<class... ArgN>
    /// use the supplied arguments to make a schedulable and then insert it to be run
    void schedule_rebind(const schedulable& scbl, ArgN&&... an) const;

    /// use the supplied arguments to make a schedulable and then insert it to be run
    template<class Arg0, class... ArgN>
    auto schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
        -> typename std::enable_if<
            (detail::is_action_function<Arg0>::value ||
            is_subscription<Arg0>::value) &&
            !is_schedulable<Arg0>::value>::type;
    /// use the supplied arguments to make a schedulable and then insert it to be run
    template<class... ArgN>
    void schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const;

    /// use the supplied arguments to make a schedulable and then insert it to be run
    template<class Arg0, class... ArgN>
    auto schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
        -> typename std::enable_if<
            (detail::is_action_function<Arg0>::value ||
            is_subscription<Arg0>::value) &&
            !is_schedulable<Arg0>::value>::type;
    /// use the supplied arguments to make a schedulable and then insert it to be run
    template<class... ArgN>
    void schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const;
};

inline bool operator==(const worker& lhs, const worker& rhs) {
    return lhs.inner == rhs.inner && lhs.lifetime == rhs.lifetime;
}
inline bool operator!=(const worker& lhs, const worker& rhs) {
    return !(lhs == rhs);
}

class weak_worker
{
    detail::worker_interface_weak_ptr inner;
    composite_subscription lifetime;

public:
    weak_worker()
    {
    }
    explicit weak_worker(worker& owner)
        : inner(owner.inner)
        , lifetime(owner.lifetime)
    {
    }

    worker lock() const {
        return worker(lifetime, inner.lock());
    }
};

class scheduler_interface
    : public std::enable_shared_from_this<scheduler_interface>
{
    typedef scheduler_interface this_type;

public:
    typedef scheduler_base::clock_type clock_type;

    virtual ~scheduler_interface() {}

    virtual clock_type::time_point now() const = 0;

    virtual worker create_worker(composite_subscription cs) const = 0;
};


struct schedulable_base :
    // public subscription_base, <- already in worker base
    public worker_base,
    public action_base
{
    typedef tag_schedulable schedulable_tag;
};

/*!
    \brief allows functions to be called at specified times and possibly in other contexts.

    \ingroup group-core

*/
class scheduler : public scheduler_base
{
    typedef scheduler this_type;
    detail::scheduler_interface_ptr inner;
    friend bool operator==(const scheduler&, const scheduler&);
public:
    typedef scheduler_base::clock_type clock_type;

    scheduler()
    {
    }
    explicit scheduler(detail::scheduler_interface_ptr i)
        : inner(std::move(i))
    {
    }
    explicit scheduler(detail::const_scheduler_interface_ptr i)
        : inner(std::const_pointer_cast<scheduler_interface>(i))
    {
    }

    /// return the current time for this scheduler
    inline clock_type::time_point now() const {
        return inner->now();
    }
    /// create a worker with a lifetime.
    /// when the worker is unsubscribed all scheduled items will be unsubscribed.
    /// items scheduled to a worker will be run one at a time.
    /// scheduling order is preserved: when more than one item is scheduled for
    /// time T then at time T they will be run in the order that they were scheduled.
    inline worker create_worker(composite_subscription cs = composite_subscription()) const {
        return inner->create_worker(cs);
    }
};

template<class Scheduler, class... ArgN>
inline scheduler make_scheduler(ArgN&&... an) {
    return scheduler(std::static_pointer_cast<scheduler_interface>(std::make_shared<Scheduler>(std::forward<ArgN>(an)...)));
}

inline scheduler make_scheduler(std::shared_ptr<scheduler_interface> si) {
    return scheduler(si);
}

class schedulable : public schedulable_base
{
    typedef schedulable this_type;

    composite_subscription lifetime;
    weak_worker controller;
    action activity;
    bool scoped;
    composite_subscription::weak_subscription action_scope;

    struct detacher
    {
        ~detacher()
        {
            if (that) {
                that->unsubscribe();
            }
        }
        detacher(const this_type* that)
            : that(that)
        {
        }
        const this_type* that;
    };

    class recursed_scope_type
    {
        mutable const recursed* requestor;

        class exit_recursed_scope_type
        {
            const recursed_scope_type* that;
        public:
            ~exit_recursed_scope_type()
            {
                if (that != nullptr) {
                    that->requestor = nullptr;
                }
            }
            exit_recursed_scope_type(const recursed_scope_type* that)
                : that(that)
            {
            }
            exit_recursed_scope_type(exit_recursed_scope_type && other) RXCPP_NOEXCEPT
                : that(other.that)
            {
                other.that = nullptr;
            }
        };
    public:
        recursed_scope_type()
            : requestor(nullptr)
        {
        }
        recursed_scope_type(const recursed_scope_type&)
            : requestor(nullptr)
        {
            // does not aquire recursion scope
        }
        recursed_scope_type& operator=(const recursed_scope_type& )
        {
            // no change in recursion scope
            return *this;
        }
        exit_recursed_scope_type reset(const recurse& r) const {
            requestor = std::addressof(r.get_recursed());
            return exit_recursed_scope_type(this);
        }
        bool is_recursed() const {
            return !!requestor;
        }
        void operator()() const {
            (*requestor)();
        }
    };
    recursed_scope_type recursed_scope;

public:
    typedef composite_subscription::weak_subscription weak_subscription;
    typedef scheduler_base::clock_type clock_type;

    ~schedulable()
    {
        if (scoped) {
            controller.lock().remove(action_scope);
        }
    }
    schedulable()
        : scoped(false)
    {
    }

    /// action and worker share lifetime
    schedulable(worker q, action a)
        : lifetime(q.get_subscription())
        , controller(q)
        , activity(std::move(a))
        , scoped(false)
    {
    }
    /// action and worker have independent lifetimes
    schedulable(composite_subscription cs, worker q, action a)
        : lifetime(std::move(cs))
        , controller(q)
        , activity(std::move(a))
        , scoped(true)
        , action_scope(controller.lock().add(lifetime))
    {
    }
    /// inherit lifetimes
    schedulable(schedulable scbl, worker q, action a)
        : lifetime(scbl.get_subscription())
        , controller(q)
        , activity(std::move(a))
        , scoped(scbl.scoped)
        , action_scope(scbl.scoped ? controller.lock().add(lifetime) : weak_subscription())
    {
    }

    inline const composite_subscription& get_subscription() const {
        return lifetime;
    }
    inline composite_subscription& get_subscription() {
        return lifetime;
    }
    inline const worker get_worker() const {
        return controller.lock();
    }
    inline worker get_worker() {
        return controller.lock();
    }
    inline const action& get_action() const {
        return activity;
    }
    inline action& get_action() {
        return activity;
    }

    inline static schedulable empty(worker sc) {
        return schedulable(composite_subscription::empty(), sc, action::empty());
    }

    inline auto set_recursed(const recurse& r) const
        -> decltype(recursed_scope.reset(r)) {
        return      recursed_scope.reset(r);
    }

    // recursed
    //
    bool is_recursed() const {
        return recursed_scope.is_recursed();
    }
    /// requests tail-recursion of the same action
    /// this will exit the process if called when
    /// is_recursed() is false.
    /// Note: to improve perf it is not required
    /// to call is_recursed() before calling this
    /// operator. Context is sufficient. The schedulable
    /// passed to the action by the scheduler will return
    /// true from is_recursed()
    inline void operator()() const {
        recursed_scope();
    }

    // composite_subscription
    //
    inline bool is_subscribed() const {
        return lifetime.is_subscribed();
    }
    inline weak_subscription add(subscription s) const {
        return lifetime.add(std::move(s));
    }
    template<class F>
    auto add(F f) const
    -> typename std::enable_if<rxcpp::detail::is_unsubscribe_function<F>::value, weak_subscription>::type {
        return lifetime.add(make_subscription(std::move(f)));
    }
    inline void remove(weak_subscription w) const {
        return lifetime.remove(std::move(w));
    }
    inline void clear() const {
        return lifetime.clear();
    }
    inline void unsubscribe() const {
        return lifetime.unsubscribe();
    }

    // scheduler
    //
    inline clock_type::time_point now() const {
        return controller.lock().now();
    }
    /// put this on the queue of the stored scheduler to run asap
    inline void schedule() const {
        if (is_subscribed()) {
            get_worker().schedule(*this);
        }
    }
    /// put this on the queue of the stored scheduler to run at the specified time
    inline void schedule(clock_type::time_point when) const {
        if (is_subscribed()) {
            get_worker().schedule(when, *this);
        }
    }
    /// put this on the queue of the stored scheduler to run after a delay from now
    inline void schedule(clock_type::duration when) const {
        if (is_subscribed()) {
            get_worker().schedule(when, *this);
        }
    }

    // action
    //
    /// invokes the action
    inline void operator()(const recurse& r) const {
        if (!is_subscribed()) {
            return;
        }
        detacher protect(this);
        activity(*this, r);
        protect.that = nullptr;
    }
};

struct current_thread;

namespace detail {

class action_type
    : public std::enable_shared_from_this<action_type>
{
    typedef action_type this_type;

public:
    typedef std::function<void(const schedulable&, const recurse&)> function_type;

private:
    function_type f;

public:
    action_type()
    {
    }

    action_type(function_type f)
        : f(std::move(f))
    {
    }

    inline void operator()(const schedulable& s, const recurse& r) {
        if (!f) {
            std::terminate();
        }
        f(s, r);
    }
};

class action_tailrecurser
    : public std::enable_shared_from_this<action_type>
{
    typedef action_type this_type;

public:
    typedef std::function<void(const schedulable&)> function_type;

private:
    function_type f;

public:
    action_tailrecurser()
    {
    }

    action_tailrecurser(function_type f)
        : f(std::move(f))
    {
    }

    inline void operator()(const schedulable& s, const recurse& r) {
        if (!f) {
            std::terminate();
        }
        trace_activity().action_enter(s);
        auto scope = s.set_recursed(r);
        while (s.is_subscribed()) {
            r.reset();
            f(s);
            if (!r.is_allowed() || !r.is_requested()) {
                if (r.is_requested()) {
                    s.schedule();
                }
                break;
            }
            trace_activity().action_recurse(s);
        }
        trace_activity().action_return(s);
    }
};
}

inline void action::operator()(const schedulable& s, const recurse& r) const {
    (*inner)(s, r);
}

inline action make_action_empty() {
    return action::empty();
}

template<class F>
inline action make_action(F&& f) {
    static_assert(detail::is_action_function<F>::value, "action function must be void(schedulable)");
    auto fn = std::forward<F>(f);
    return action(std::make_shared<detail::action_type>(detail::action_tailrecurser(fn)));
}

// copy
inline auto make_schedulable(
    const   schedulable& scbl)
    ->      schedulable {
    return  schedulable(scbl);
}
// move
inline auto make_schedulable(
            schedulable&& scbl)
    ->      schedulable {
    return  schedulable(std::move(scbl));
}

inline schedulable make_schedulable(worker sc, action a) {
    return schedulable(sc, a);
}
inline schedulable make_schedulable(worker sc, composite_subscription cs, action a) {
    return schedulable(cs, sc, a);
}

template<class F>
auto make_schedulable(worker sc, F&& f)
    -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    return schedulable(sc, make_action(std::forward<F>(f)));
}
template<class F>
auto make_schedulable(worker sc, composite_subscription cs, F&& f)
    -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    return schedulable(cs, sc, make_action(std::forward<F>(f)));
}
template<class F>
auto make_schedulable(schedulable scbl, composite_subscription cs, F&& f)
    -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    return schedulable(cs, scbl.get_worker(), make_action(std::forward<F>(f)));
}
template<class F>
auto make_schedulable(schedulable scbl, worker sc, F&& f)
    -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    return schedulable(scbl, sc, make_action(std::forward<F>(f)));
}
template<class F>
auto make_schedulable(schedulable scbl, F&& f)
    -> typename std::enable_if<detail::is_action_function<F>::value, schedulable>::type {
    return schedulable(scbl, scbl.get_worker(), make_action(std::forward<F>(f)));
}

inline auto make_schedulable(schedulable scbl, composite_subscription cs)
    -> schedulable {
    return schedulable(cs, scbl.get_worker(), scbl.get_action());
}
inline auto make_schedulable(schedulable scbl, worker sc, composite_subscription cs)
    -> schedulable {
    return schedulable(cs, sc, scbl.get_action());
}
inline auto make_schedulable(schedulable scbl, worker sc)
    -> schedulable {
    return schedulable(scbl, sc, scbl.get_action());
}

template<class Arg0, class... ArgN>
auto worker::schedule(Arg0&& a0, ArgN&&... an) const
    -> typename std::enable_if<
        (detail::is_action_function<Arg0>::value ||
        is_subscription<Arg0>::value) &&
        !is_schedulable<Arg0>::value>::type {
    auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
    trace_activity().schedule_enter(*inner.get(), scbl);
    inner->schedule(std::move(scbl));
    trace_activity().schedule_return(*inner.get());
}
template<class... ArgN>
void worker::schedule_rebind(const schedulable& scbl, ArgN&&... an) const {
    auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
    trace_activity().schedule_enter(*inner.get(), rescbl);
    inner->schedule(std::move(rescbl));
    trace_activity().schedule_return(*inner.get());
}

template<class Arg0, class... ArgN>
auto worker::schedule(clock_type::time_point when, Arg0&& a0, ArgN&&... an) const
    -> typename std::enable_if<
        (detail::is_action_function<Arg0>::value ||
        is_subscription<Arg0>::value) &&
        !is_schedulable<Arg0>::value>::type {
    auto scbl = make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...);
    trace_activity().schedule_when_enter(*inner.get(), when, scbl);
    inner->schedule(when, std::move(scbl));
    trace_activity().schedule_when_return(*inner.get());
}
template<class... ArgN>
void worker::schedule_rebind(clock_type::time_point when, const schedulable& scbl, ArgN&&... an) const {
    auto rescbl = make_schedulable(scbl, *this, std::forward<ArgN>(an)...);
    trace_activity().schedule_when_enter(*inner.get(), when, rescbl);
    inner->schedule(when, std::move(rescbl));
    trace_activity().schedule_when_return(*inner.get());
}

template<class Arg0, class... ArgN>
auto worker::schedule_periodically(clock_type::time_point initial, clock_type::duration period, Arg0&& a0, ArgN&&... an) const
    -> typename std::enable_if<
        (detail::is_action_function<Arg0>::value ||
        is_subscription<Arg0>::value) &&
        !is_schedulable<Arg0>::value>::type {
    schedule_periodically_rebind(initial, period, make_schedulable(*this, std::forward<Arg0>(a0), std::forward<ArgN>(an)...));
}
template<class... ArgN>
void worker::schedule_periodically_rebind(clock_type::time_point initial, clock_type::duration period, const schedulable& scbl, ArgN&&... an) const {
    auto keepAlive = *this;
    auto target = std::make_shared<clock_type::time_point>(initial);
    auto activity = make_schedulable(scbl, keepAlive, std::forward<ArgN>(an)...);
    auto periodic = make_schedulable(
        activity,
        [keepAlive, target, period, activity](schedulable self) {
            // any recursion requests will be pushed to the scheduler queue
            recursion r(false);
            // call action
            activity(r.get_recurse());

            // schedule next occurance (if the action took longer than 'period' target will be in the past)
            *target += period;
            self.schedule(*target);
        });
    trace_activity().schedule_when_enter(*inner.get(), *target, periodic);
    inner->schedule(*target, periodic);
    trace_activity().schedule_when_return(*inner.get());
}

namespace detail {

template<class TimePoint>
struct time_schedulable
{
    typedef TimePoint time_point_type;

    time_schedulable(TimePoint when, schedulable a)
        : when(when)
        , what(std::move(a))
    {
    }
    TimePoint when;
    schedulable what;
};


// Sorts time_schedulable items in priority order sorted
// on value of time_schedulable.when. Items with equal
// values for when are sorted in fifo order.
template<class TimePoint>
class schedulable_queue {
public:
    typedef time_schedulable<TimePoint> item_type;
    typedef std::pair<item_type, int64_t> elem_type;
    typedef std::vector<elem_type> container_type;
    typedef const item_type& const_reference;

private:
    struct compare_elem
    {
        bool operator()(const elem_type& lhs, const elem_type& rhs) const {
            if (lhs.first.when == rhs.first.when) {
                return lhs.second > rhs.second;
            }
            else {
                return lhs.first.when > rhs.first.when;
            }
        }
    };

    typedef std::priority_queue<
        elem_type,
        container_type,
        compare_elem
    > queue_type;

    queue_type q;

    int64_t ordinal;
public:

    schedulable_queue() 
        : ordinal(0) 
    {
    }

    const_reference top() const {
        return q.top().first;
    }

    void pop() {
        q.pop();
    }

    bool empty() const {
        return q.empty();
    }

    void push(const item_type& value) {
        q.push(elem_type(value, ordinal++));
    }

    void push(item_type&& value) {
        q.push(elem_type(std::move(value), ordinal++));
    }
};

}

}
namespace rxsc=schedulers;

}

#include "schedulers/rx-currentthread.hpp"
#include "schedulers/rx-runloop.hpp"
#include "schedulers/rx-newthread.hpp"
#include "schedulers/rx-eventloop.hpp"
#include "schedulers/rx-immediate.hpp"
#include "schedulers/rx-virtualtime.hpp"
#include "schedulers/rx-sameworker.hpp"

#endif