// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#pragma once

#if !defined(RXCPP_RX_OBSERVER_HPP)
#define RXCPP_RX_OBSERVER_HPP

#include "rx-includes.hpp"

namespace rxcpp {


template<class T>
struct observer_base
{
    typedef T value_type;
    typedef tag_observer observer_tag;
};

namespace detail {
template<class T>
struct OnNextEmpty
{
    void operator()(const T&) const {}
};
struct OnErrorEmpty
{
    void operator()(rxu::error_ptr) const {
        // error implicitly ignored, abort
        std::terminate();
    }
};
struct OnErrorIgnore
{
    void operator()(rxu::error_ptr) const {
    }
};
struct OnCompletedEmpty
{
    void operator()() const {}
};

template<class T, class State, class OnNext>
struct OnNextForward
{
    using state_t = rxu::decay_t<State>;
    using onnext_t = rxu::decay_t<OnNext>;
    OnNextForward() : onnext() {}
    explicit OnNextForward(onnext_t on) : onnext(std::move(on)) {}
    onnext_t onnext;
    void operator()(state_t& s, T& t) const {
        onnext(s, t);
    }
    void operator()(state_t& s, T&& t) const {
        onnext(s, t);
    }
};
template<class T, class State>
struct OnNextForward<T, State, void>
{
    using state_t = rxu::decay_t<State>;
    OnNextForward() {}
    void operator()(state_t& s, T& t) const {
        s.on_next(t);
    }
    void operator()(state_t& s, T&& t) const {
        s.on_next(t);
    }
};

template<class State, class OnError>
struct OnErrorForward
{
    using state_t = rxu::decay_t<State>;
    using onerror_t = rxu::decay_t<OnError>;
    OnErrorForward() : onerror() {}
    explicit OnErrorForward(onerror_t oe) : onerror(std::move(oe)) {}
    onerror_t onerror;
    void operator()(state_t& s, rxu::error_ptr ep) const {
        onerror(s, ep);
    }
};
template<class State>
struct OnErrorForward<State, void>
{
    using state_t = rxu::decay_t<State>;
    OnErrorForward() {}
    void operator()(state_t& s, rxu::error_ptr ep) const {
        s.on_error(ep);
    }
};

template<class State, class OnCompleted>
struct OnCompletedForward
{
    using state_t = rxu::decay_t<State>;
    using oncompleted_t = rxu::decay_t<OnCompleted>;
    OnCompletedForward() : oncompleted() {}
    explicit OnCompletedForward(oncompleted_t oc) : oncompleted(std::move(oc)) {}
    oncompleted_t oncompleted;
    void operator()(state_t& s) const {
        oncompleted(s);
    }
};
template<class State>
struct OnCompletedForward<State, void>
{
    OnCompletedForward() {}
    void operator()(State& s) const {
        s.on_completed();
    }
};

template<class T, class F>
struct is_on_next_of
{
    struct not_void {};
    template<class CT, class CF>
    static auto check(int) -> decltype((*(CF*)nullptr)(*(CT*)nullptr));
    template<class CT, class CF>
    static not_void check(...);

    typedef decltype(check<T, rxu::decay_t<F>>(0)) detail_result;
    static const bool value = std::is_same<detail_result, void>::value;
};

template<class F>
struct is_on_error
{
    struct not_void {};
    template<class CF>
    static auto check(int) -> decltype((*(CF*)nullptr)(*(rxu::error_ptr*)nullptr));
    template<class CF>
    static not_void check(...);

    static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};

template<class State, class F>
struct is_on_error_for
{
    struct not_void {};
    template<class CF>
    static auto check(int) -> decltype((*(CF*)nullptr)(*(State*)nullptr, *(rxu::error_ptr*)nullptr));
    template<class CF>
    static not_void check(...);

    static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};

template<class F>
struct is_on_completed
{
    struct not_void {};
    template<class CF>
    static auto check(int) -> decltype((*(CF*)nullptr)());
    template<class CF>
    static not_void check(...);

    static const bool value = std::is_same<decltype(check<rxu::decay_t<F>>(0)), void>::value;
};

}


/*!
    \brief consumes values from an observable using `State` that may implement on_next, on_error and on_completed with optional overrides of each function.

    \tparam T            - the type of value in the stream
    \tparam State        - the type of the stored state
    \tparam OnNext       - the type of a function that matches `void(State&, T)`. Called 0 or more times. If `void` State::on_next will be called.
    \tparam OnError      - the type of a function that matches `void(State&, rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_error will be called.
    \tparam OnCompleted  - the type of a function that matches `void(State&)`. Called 0 or 1 times, no further calls will be made. If `void` State::on_completed will be called.

    \ingroup group-core

*/
template<class T, class State, class OnNext, class OnError, class OnCompleted>
class observer : public observer_base<T>
{
public:
    using this_type = observer<T, State, OnNext, OnError, OnCompleted>;
    using state_t = rxu::decay_t<State>;
    using on_next_t = typename std::conditional<
        !std::is_same<void, OnNext>::value,
        rxu::decay_t<OnNext>,
        detail::OnNextForward<T, State, OnNext>>::type;
    using on_error_t = typename std::conditional<
        !std::is_same<void, OnError>::value,
        rxu::decay_t<OnError>,
        detail::OnErrorForward<State, OnError>>::type;
    using on_completed_t = typename std::conditional<
        !std::is_same<void, OnCompleted>::value,
        rxu::decay_t<OnCompleted>,
        detail::OnCompletedForward<State, OnCompleted>>::type;

private:
    mutable state_t state;
    on_next_t onnext;
    on_error_t onerror;
    on_completed_t oncompleted;

public:

    explicit observer(state_t s, on_next_t n = on_next_t(), on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
        : state(std::move(s))
        , onnext(std::move(n))
        , onerror(std::move(e))
        , oncompleted(std::move(c))
    {
    }
    explicit observer(state_t s, on_next_t n, on_completed_t c)
        : state(std::move(s))
        , onnext(std::move(n))
        , onerror(on_error_t())
        , oncompleted(std::move(c))
    {
    }
    observer(const this_type& o)
        : state(o.state)
        , onnext(o.onnext)
        , onerror(o.onerror)
        , oncompleted(o.oncompleted)
    {
    }
    observer(this_type&& o)
        : state(std::move(o.state))
        , onnext(std::move(o.onnext))
        , onerror(std::move(o.onerror))
        , oncompleted(std::move(o.oncompleted))
    {
    }
    this_type& operator=(this_type o) {
        state = std::move(o.state);
        onnext = std::move(o.onnext);
        onerror = std::move(o.onerror);
        oncompleted = std::move(o.oncompleted);
        return *this;
    }

    void on_next(T& t) const {
        onnext(state, t);
    }
    void on_next(T&& t) const {
        onnext(state, std::move(t));
    }
    void on_error(rxu::error_ptr e) const {
        onerror(state, e);
    }
    void on_completed() const {
        oncompleted(state);
    }
    observer<T> as_dynamic() const {
        return observer<T>(*this);
    }
};

/*!
    \brief consumes values from an observable using default empty method implementations with optional overrides of each function.

    \tparam T            - the type of value in the stream
    \tparam OnNext       - the type of a function that matches `void(T)`. Called 0 or more times. If `void` OnNextEmpty<T> is used.
    \tparam OnError      - the type of a function that matches `void(rxu::error_ptr)`. Called 0 or 1 times, no further calls will be made. If `void` OnErrorEmpty is used.
    \tparam OnCompleted  - the type of a function that matches `void()`. Called 0 or 1 times, no further calls will be made. If `void` OnCompletedEmpty is used.

    \ingroup group-core

*/
template<class T, class OnNext, class OnError, class OnCompleted>
class observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted> : public observer_base<T>
{
public:
    using this_type = observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>;
    using on_next_t = typename std::conditional<
        !std::is_same<void, OnNext>::value,
        rxu::decay_t<OnNext>,
        detail::OnNextEmpty<T>>::type;
    using on_error_t = typename std::conditional<
        !std::is_same<void, OnError>::value,
        rxu::decay_t<OnError>,
        detail::OnErrorEmpty>::type;
    using on_completed_t = typename std::conditional<
        !std::is_same<void, OnCompleted>::value,
        rxu::decay_t<OnCompleted>,
        detail::OnCompletedEmpty>::type;

private:
    on_next_t onnext;
    on_error_t onerror;
    on_completed_t oncompleted;

public:
    static_assert(detail::is_on_next_of<T, on_next_t>::value,     "Function supplied for on_next must be a function with the signature void(T);");
    static_assert(detail::is_on_error<on_error_t>::value,         "Function supplied for on_error must be a function with the signature void(rxu::error_ptr);");
    static_assert(detail::is_on_completed<on_completed_t>::value, "Function supplied for on_completed must be a function with the signature void();");

    observer()
        : onnext(on_next_t())
        , onerror(on_error_t())
        , oncompleted(on_completed_t())
    {
    }

    explicit observer(on_next_t n, on_error_t e = on_error_t(), on_completed_t c = on_completed_t())
        : onnext(std::move(n))
        , onerror(std::move(e))
        , oncompleted(std::move(c))
    {
    }
    observer(const this_type& o)
        : onnext(o.onnext)
        , onerror(o.onerror)
        , oncompleted(o.oncompleted)
    {
    }
    observer(this_type&& o)
        : onnext(std::move(o.onnext))
        , onerror(std::move(o.onerror))
        , oncompleted(std::move(o.oncompleted))
    {
    }
    this_type& operator=(this_type o) {
        onnext = std::move(o.onnext);
        onerror = std::move(o.onerror);
        oncompleted = std::move(o.oncompleted);
        return *this;
    }

    void on_next(T& t) const {
        onnext(t);
    }
    void on_next(T&& t) const {
        onnext(std::move(t));
    }
    void on_error(rxu::error_ptr e) const {
        onerror(e);
    }
    void on_completed() const {
        oncompleted();
    }
    observer<T> as_dynamic() const {
        return observer<T>(*this);
    }
};

namespace detail
{

template<class T>
struct virtual_observer : public std::enable_shared_from_this<virtual_observer<T>>
{
    virtual ~virtual_observer() {}
    virtual void on_next(T&) const {};
    virtual void on_next(T&&) const {};
    virtual void on_error(rxu::error_ptr) const {};
    virtual void on_completed() const {};
};

template<class T, class Observer>
struct specific_observer : public virtual_observer<T>
{
    explicit specific_observer(Observer o)
        : destination(std::move(o))
    {
    }

    Observer destination;
    virtual void on_next(T& t) const {
        destination.on_next(t);
    }
    virtual void on_next(T&& t) const {
        destination.on_next(std::move(t));
    }
    virtual void on_error(rxu::error_ptr e) const {
        destination.on_error(e);
    }
    virtual void on_completed() const {
        destination.on_completed();
    }
};

}

/*!
    \brief consumes values from an observable using type-forgetting (shared allocated state with virtual methods)

    \tparam T            - the type of value in the stream

    \ingroup group-core

*/
template<class T>
class observer<T, void, void, void, void> : public observer_base<T>
{
public:
    typedef tag_dynamic_observer dynamic_observer_tag;

private:
    using this_type = observer<T, void, void, void, void>;
    using base_type = observer_base<T>;
    using virtual_observer = detail::virtual_observer<T>;

    std::shared_ptr<virtual_observer> destination;

    template<class Observer>
    static auto make_destination(Observer o)
        -> std::shared_ptr<virtual_observer> {
        return std::make_shared<detail::specific_observer<T, Observer>>(std::move(o));
    }

public:
    observer()
    {
    }
    observer(const this_type& o)
        : destination(o.destination)
    {
    }
    observer(this_type&& o)
        : destination(std::move(o.destination))
    {
    }

    template<class Observer>
    explicit observer(Observer o)
        : destination(make_destination(std::move(o)))
    {
    }

    this_type& operator=(this_type o) {
        destination = std::move(o.destination);
        return *this;
    }

    // perfect forwarding delays the copy of the value.
    template<class V>
    void on_next(V&& v) const {
        if (destination) {
            destination->on_next(std::forward<V>(v));
        }
    }
    void on_error(rxu::error_ptr e) const {
        if (destination) {
            destination->on_error(e);
        }
    }
    void on_completed() const {
        if (destination) {
            destination->on_completed();
        }
    }

    observer<T> as_dynamic() const {
        return *this;
    }
};

template<class T, class DefaultOnError = detail::OnErrorEmpty>
auto make_observer()
    ->      observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError> {
    return  observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, DefaultOnError>();
}

template<class T, class DefaultOnError = detail::OnErrorEmpty, class U, class State, class OnNext, class OnError, class OnCompleted>
auto make_observer(observer<U, State, OnNext, OnError, OnCompleted> o)
    ->      observer<T, State, OnNext, OnError, OnCompleted> {
    return  observer<T, State, OnNext, OnError, OnCompleted>(std::move(o));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
auto make_observer(Observer ob)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, Observer>::value &&
        !detail::is_on_error<Observer>::value &&
        is_observer<Observer>::value,
            Observer>::type {
    return  std::move(ob);
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class Observer>
auto make_observer(Observer ob)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, Observer>::value &&
        !detail::is_on_error<Observer>::value &&
        !is_observer<Observer>::value,
            observer<T, Observer>>::type {
    return  observer<T, Observer>(std::move(ob));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext>
auto make_observer(OnNext on)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value,
            observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>>::type {
    return  observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError>(
                        std::move(on));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnError>
auto make_observer(OnError oe)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, OnError>::value &&
        detail::is_on_error<OnError>::value,
            observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>>::type {
    return  observer<T, detail::stateless_observer_tag, detail::OnNextEmpty<T>, OnError>(
                        detail::OnNextEmpty<T>(), std::move(oe));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError>
auto make_observer(OnNext on, OnError oe)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value &&
        detail::is_on_error<OnError>::value,
            observer<T, detail::stateless_observer_tag, OnNext, OnError>>::type {
    return  observer<T, detail::stateless_observer_tag, OnNext, OnError>(
                        std::move(on), std::move(oe));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnCompleted>
auto make_observer(OnNext on, OnCompleted oc)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value &&
        detail::is_on_completed<OnCompleted>::value,
            observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>>::type {
    return  observer<T, detail::stateless_observer_tag, OnNext, DefaultOnError, OnCompleted>(
                        std::move(on), DefaultOnError(), std::move(oc));
}
template<class T, class DefaultOnError = detail::OnErrorEmpty, class OnNext, class OnError, class OnCompleted>
auto make_observer(OnNext on, OnError oe, OnCompleted oc)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value &&
        detail::is_on_error<OnError>::value &&
        detail::is_on_completed<OnCompleted>::value,
            observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>>::type {
    return  observer<T, detail::stateless_observer_tag, OnNext, OnError, OnCompleted>(
                        std::move(on), std::move(oe), std::move(oc));
}


template<class T, class State, class OnNext>
auto make_observer(State os, OnNext on)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, State>::value &&
        !detail::is_on_error<State>::value,
            observer<T, State, OnNext>>::type {
    return  observer<T, State, OnNext>(
                        std::move(os), std::move(on));
}
template<class T, class State, class OnError>
auto make_observer(State os, OnError oe)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, State>::value &&
        !detail::is_on_error<State>::value &&
        detail::is_on_error_for<State, OnError>::value,
            observer<T, State, detail::OnNextEmpty<T>, OnError>>::type {
    return  observer<T, State, detail::OnNextEmpty<T>, OnError>(
                        std::move(os), detail::OnNextEmpty<T>(), std::move(oe));
}
template<class T, class State, class OnNext, class OnError>
auto make_observer(State os, OnNext on, OnError oe)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, State>::value &&
        !detail::is_on_error<State>::value &&
        detail::is_on_error_for<State, OnError>::value,
            observer<T, State, OnNext, OnError>>::type {
    return  observer<T, State, OnNext, OnError>(
                        std::move(os), std::move(on), std::move(oe));
}
template<class T, class State, class OnNext, class OnCompleted>
auto make_observer(State os, OnNext on, OnCompleted oc)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, State>::value &&
        !detail::is_on_error<State>::value,
            observer<T, State, OnNext, void, OnCompleted>>::type {
    return  observer<T, State, OnNext, void, OnCompleted>(
                        std::move(os), std::move(on), std::move(oc));
}
template<class T, class State, class OnNext, class OnError, class OnCompleted>
auto make_observer(State os, OnNext on, OnError oe, OnCompleted oc)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, State>::value &&
        !detail::is_on_error<State>::value &&
        detail::is_on_error_for<State, OnError>::value,
            observer<T, State, OnNext, OnError, OnCompleted>>::type {
    return  observer<T, State, OnNext, OnError, OnCompleted>(
                        std::move(os), std::move(on), std::move(oe), std::move(oc));
}

template<class T, class Observer>
auto make_observer_dynamic(Observer o)
    -> typename std::enable_if<
        !detail::is_on_next_of<T, Observer>::value,
            observer<T>>::type {
    return  observer<T>(std::move(o));
}
template<class T, class OnNext>
auto make_observer_dynamic(OnNext&& on)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value,
            observer<T>>::type {
    return  observer<T>(
                make_observer<T>(std::forward<OnNext>(on)));
}
template<class T, class OnNext, class OnError>
auto make_observer_dynamic(OnNext&& on, OnError&& oe)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value &&
        detail::is_on_error<OnError>::value,
            observer<T>>::type {
    return  observer<T>(
                make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe)));
}
template<class T, class OnNext, class OnCompleted>
auto make_observer_dynamic(OnNext&& on, OnCompleted&& oc)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value &&
        detail::is_on_completed<OnCompleted>::value,
            observer<T>>::type {
    return  observer<T>(
                make_observer<T>(std::forward<OnNext>(on), std::forward<OnCompleted>(oc)));
}
template<class T, class OnNext, class OnError, class OnCompleted>
auto make_observer_dynamic(OnNext&& on, OnError&& oe, OnCompleted&& oc)
    -> typename std::enable_if<
        detail::is_on_next_of<T, OnNext>::value &&
        detail::is_on_error<OnError>::value &&
        detail::is_on_completed<OnCompleted>::value,
            observer<T>>::type {
    return  observer<T>(
                make_observer<T>(std::forward<OnNext>(on), std::forward<OnError>(oe), std::forward<OnCompleted>(oc)));
}

namespace detail {

template<class F>
struct maybe_from_result
{
    typedef decltype((*(F*)nullptr)()) decl_result_type;
    typedef rxu::decay_t<decl_result_type> result_type;
    typedef rxu::maybe<result_type> type;
};

}

template<class F, class OnError>
auto on_exception(const F& f, const OnError& c)
    ->  typename std::enable_if<detail::is_on_error<OnError>::value, typename detail::maybe_from_result<F>::type>::type {
    typename detail::maybe_from_result<F>::type r;
    RXCPP_TRY {
        r.reset(f());
    } RXCPP_CATCH(...) {
        c(rxu::current_exception());
    }
    return r;
}

template<class F, class Subscriber>
auto on_exception(const F& f, const Subscriber& s)
    ->  typename std::enable_if<is_subscriber<Subscriber>::value, typename detail::maybe_from_result<F>::type>::type {
    typename detail::maybe_from_result<F>::type r;
    RXCPP_TRY {
        r.reset(f());
    } RXCPP_CATCH(...) {
        s.on_error(rxu::current_exception());
    }
    return r;
}

}

#endif