// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_RX_COORDINATION_HPP)
#define RXCPP_RX_COORDINATION_HPP

#include "rx-includes.hpp"

namespace rxcpp {

struct tag_coordinator {};
struct coordinator_base {typedef tag_coordinator coordinator_tag;};

template<class T, class C = rxu::types_checked>
struct is_coordinator : public std::false_type {};

template<class T>
struct is_coordinator<T, typename rxu::types_checked_from<typename T::coordinator_tag>::type>
    : public std::is_convertible<typename T::coordinator_tag*, tag_coordinator*> {};

struct tag_coordination {};
struct coordination_base {typedef tag_coordination coordination_tag;};

namespace detail {

template<class T, class C = rxu::types_checked>
struct is_coordination : public std::false_type {};

template<class T>
struct is_coordination<T, typename rxu::types_checked_from<typename T::coordination_tag>::type>
    : public std::is_convertible<typename T::coordination_tag*, tag_coordination*> {};

}

template<class T, class Decayed = rxu::decay_t<T>>
struct is_coordination : detail::is_coordination<Decayed>
{
};

template<class Coordination, class DecayedCoordination = rxu::decay_t<Coordination>>
using coordination_tag_t = typename DecayedCoordination::coordination_tag;

template<class Input>
class coordinator : public coordinator_base
{
public:
    typedef Input input_type;

private:
    struct not_supported {typedef not_supported type;};

    template<class Observable>
    struct get_observable
    {
        typedef decltype((*(input_type*)nullptr).in((*(Observable*)nullptr))) type;
    };

    template<class Subscriber>
    struct get_subscriber
    {
        typedef decltype((*(input_type*)nullptr).out((*(Subscriber*)nullptr))) type;
    };

    template<class F>
    struct get_action_function
    {
        typedef decltype((*(input_type*)nullptr).act((*(F*)nullptr))) type;
    };

public:
    input_type input;

    template<class T>
    struct get
    {
        typedef typename std::conditional<
            rxsc::detail::is_action_function<T>::value, get_action_function<T>, typename std::conditional<
            is_observable<T>::value, get_observable<T>, typename std::conditional<
            is_subscriber<T>::value, get_subscriber<T>, not_supported>::type>::type>::type::type type;
    };

    coordinator(Input i) : input(i) {}

    rxsc::worker get_worker() const {
        return input.get_worker();
    }
    rxsc::scheduler get_scheduler() const {
        return input.get_scheduler();
    }

    template<class Observable>
    auto in(Observable o) const
        -> typename get_observable<Observable>::type {
        return input.in(std::move(o));
        static_assert(is_observable<Observable>::value, "can only synchronize observables");
    }

    template<class Subscriber>
    auto out(Subscriber s) const
        -> typename get_subscriber<Subscriber>::type {
        return input.out(std::move(s));
        static_assert(is_subscriber<Subscriber>::value, "can only synchronize subscribers");
    }

    template<class F>
    auto act(F f) const
        -> typename get_action_function<F>::type {
        return input.act(std::move(f));
        static_assert(rxsc::detail::is_action_function<F>::value, "can only synchronize action functions");
    }
};

class identity_one_worker : public coordination_base
{
    rxsc::scheduler factory;

    class input_type
    {
        rxsc::worker controller;
        rxsc::scheduler factory;
    public:
        explicit input_type(rxsc::worker w)
            : controller(w)
            , factory(rxsc::make_same_worker(w))
        {
        }
        inline rxsc::worker get_worker() const {
            return controller;
        }
        inline rxsc::scheduler get_scheduler() const {
            return factory;
        }
        inline rxsc::scheduler::clock_type::time_point now() const {
            return factory.now();
        }
        template<class Observable>
        auto in(Observable o) const
            -> Observable {
            return o;
        }
        template<class Subscriber>
        auto out(Subscriber s) const
            -> Subscriber {
            return s;
        }
        template<class F>
        auto act(F f) const
            -> F {
            return f;
        }
    };

public:

    explicit identity_one_worker(rxsc::scheduler sc) : factory(sc) {}

    typedef coordinator<input_type> coordinator_type;

    inline rxsc::scheduler::clock_type::time_point now() const {
        return factory.now();
    }

    inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
        auto w = factory.create_worker(std::move(cs));
        return coordinator_type(input_type(std::move(w)));
    }
};

inline identity_one_worker identity_immediate() {
    static identity_one_worker r(rxsc::make_immediate());
    return r;
}

inline identity_one_worker identity_current_thread() {
    static identity_one_worker r(rxsc::make_current_thread());
    return r;
}

inline identity_one_worker identity_same_worker(rxsc::worker w) {
    return identity_one_worker(rxsc::make_same_worker(w));
}

class serialize_one_worker : public coordination_base
{
    rxsc::scheduler factory;

    template<class F>
    struct serialize_action
    {
        F dest;
        std::shared_ptr<std::mutex> lock;
        serialize_action(F d, std::shared_ptr<std::mutex> m)
            : dest(std::move(d))
            , lock(std::move(m))
        {
            if (!lock) {
                std::terminate();
            }
        }
        auto operator()(const rxsc::schedulable& scbl) const
            -> decltype(dest(scbl)) {
            std::unique_lock<std::mutex> guard(*lock);
            return dest(scbl);
        }
    };

    template<class Observer>
    struct serialize_observer
    {
        typedef serialize_observer<Observer> this_type;
        typedef rxu::decay_t<Observer> dest_type;
        typedef typename dest_type::value_type value_type;
        typedef observer<value_type, this_type> observer_type;
        dest_type dest;
        std::shared_ptr<std::mutex> lock;

        serialize_observer(dest_type d, std::shared_ptr<std::mutex> m)
            : dest(std::move(d))
            , lock(std::move(m))
        {
            if (!lock) {
                std::terminate();
            }
        }
        void on_next(value_type v) const {
            std::unique_lock<std::mutex> guard(*lock);
            dest.on_next(v);
        }
        void on_error(rxu::error_ptr e) const {
            std::unique_lock<std::mutex> guard(*lock);
            dest.on_error(e);
        }
        void on_completed() const {
            std::unique_lock<std::mutex> guard(*lock);
            dest.on_completed();
        }

        template<class Subscriber>
        static subscriber<value_type, observer_type> make(const Subscriber& s, std::shared_ptr<std::mutex> m) {
            return make_subscriber<value_type>(s, observer_type(this_type(s.get_observer(), std::move(m))));
        }
    };

    class input_type
    {
        rxsc::worker controller;
        rxsc::scheduler factory;
        std::shared_ptr<std::mutex> lock;
    public:
        explicit input_type(rxsc::worker w, std::shared_ptr<std::mutex> m)
            : controller(w)
            , factory(rxsc::make_same_worker(w))
            , lock(std::move(m))
        {
        }
        inline rxsc::worker get_worker() const {
            return controller;
        }
        inline rxsc::scheduler get_scheduler() const {
            return factory;
        }
        inline rxsc::scheduler::clock_type::time_point now() const {
            return factory.now();
        }
        template<class Observable>
        auto in(Observable o) const
            -> Observable {
            return o;
        }
        template<class Subscriber>
        auto out(const Subscriber& s) const
            -> decltype(serialize_observer<decltype(s.get_observer())>::make(s, lock)) {
            return      serialize_observer<decltype(s.get_observer())>::make(s, lock);
        }
        template<class F>
        auto act(F f) const
            ->      serialize_action<F> {
            return  serialize_action<F>(std::move(f), lock);
        }
    };

public:

    explicit serialize_one_worker(rxsc::scheduler sc) : factory(sc) {}

    typedef coordinator<input_type> coordinator_type;

    inline rxsc::scheduler::clock_type::time_point now() const {
        return factory.now();
    }

    inline coordinator_type create_coordinator(composite_subscription cs = composite_subscription()) const {
        auto w = factory.create_worker(std::move(cs));
        std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
        return coordinator_type(input_type(std::move(w), std::move(lock)));
    }
};

inline serialize_one_worker serialize_event_loop() {
    static serialize_one_worker r(rxsc::make_event_loop());
    return r;
}

inline serialize_one_worker serialize_new_thread() {
    static serialize_one_worker r(rxsc::make_new_thread());
    return r;
}

inline serialize_one_worker serialize_same_worker(rxsc::worker w) {
    return serialize_one_worker(rxsc::make_same_worker(w));
}

}

#endif