C++程序  |  241行  |  7.96 KB

// Copyright 2018 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.

// Can't compile this for Zircon userspace yet since libstdc++ isn't available.
#ifndef FIT_NO_STD_FOR_ZIRCON_USERSPACE

#include <condition_variable>
#include <mutex>

#include <lib/fit/single_threaded_executor.h>
#include <lib/fit/thread_safety.h>

namespace fit {

// The dispatcher runs tasks and provides the suspended task resolver.
//
// The lifetime of this object is somewhat complex since there are pointers
// to it from multiple sources which are released in different ways.
//
// - |single_threaded_executor| holds a pointer in |dispatcher_| which it releases
//   after calling |shutdown()| to inform the dispatcher of its own demise
// - |suspended_task| holds a pointer to the dispatcher's resolver
//   interface and the number of outstanding pointers corresponds to the
//   number of outstanding suspended task tickets tracked by |scheduler_|.
//
// The dispatcher deletes itself once all pointers have been released.
class single_threaded_executor::dispatcher_impl final
    : public suspended_task::resolver {
public:
    dispatcher_impl();

    void shutdown();
    void schedule_task(pending_task task);
    void run(context_impl& context);
    suspended_task suspend_current_task();

    suspended_task::ticket duplicate_ticket(
        suspended_task::ticket ticket) override;
    void resolve_ticket(
        suspended_task::ticket ticket, bool resume_task) override;

private:
    ~dispatcher_impl() override;

    void wait_for_runnable_tasks(
        fit::subtle::scheduler::task_queue* out_tasks);
    void run_task(pending_task* task, context& context);

    suspended_task::ticket current_task_ticket_ = 0;
    std::condition_variable wake_;

    // A bunch of state that is guarded by a mutex.
    struct {
        std::mutex mutex_;
        bool was_shutdown_ FIT_GUARDED(mutex_) = false;
        bool need_wake_ FIT_GUARDED(mutex_) = false;
        fit::subtle::scheduler scheduler_ FIT_GUARDED(mutex_);
    } guarded_;
};

single_threaded_executor::single_threaded_executor()
    : context_(this), dispatcher_(new dispatcher_impl()) {}

single_threaded_executor::~single_threaded_executor() {
    dispatcher_->shutdown();
}

void single_threaded_executor::schedule_task(pending_task task) {
    assert(task);
    dispatcher_->schedule_task(std::move(task));
}

void single_threaded_executor::run() {
    dispatcher_->run(context_);
}

single_threaded_executor::context_impl::context_impl(single_threaded_executor* executor)
    : executor_(executor) {}

single_threaded_executor::context_impl::~context_impl() = default;

single_threaded_executor* single_threaded_executor::context_impl::executor() const {
    return executor_;
}

suspended_task single_threaded_executor::context_impl::suspend_task() {
    return executor_->dispatcher_->suspend_current_task();
}

single_threaded_executor::dispatcher_impl::dispatcher_impl() = default;

single_threaded_executor::dispatcher_impl::~dispatcher_impl() {
    std::lock_guard<std::mutex> lock(guarded_.mutex_);
    assert(guarded_.was_shutdown_);
    assert(!guarded_.scheduler_.has_runnable_tasks());
    assert(!guarded_.scheduler_.has_suspended_tasks());
    assert(!guarded_.scheduler_.has_outstanding_tickets());
}

void single_threaded_executor::dispatcher_impl::shutdown() {
    fit::subtle::scheduler::task_queue tasks; // drop outside of the lock
    {
        std::lock_guard<std::mutex> lock(guarded_.mutex_);
        assert(!guarded_.was_shutdown_);
        guarded_.was_shutdown_ = true;
        guarded_.scheduler_.take_all_tasks(&tasks);
        if (guarded_.scheduler_.has_outstanding_tickets()) {
            return; // can't delete self yet
        }
    }

    // Must destroy self outside of the lock.
    delete this;
}

void single_threaded_executor::dispatcher_impl::schedule_task(pending_task task) {
    {
        std::lock_guard<std::mutex> lock(guarded_.mutex_);
        assert(!guarded_.was_shutdown_);
        guarded_.scheduler_.schedule_task(std::move(task));
        if (!guarded_.need_wake_) {
            return; // don't need to wake
        }
        guarded_.need_wake_ = false;
    }

    // It is more efficient to notify outside the lock.
    wake_.notify_one();
}

void single_threaded_executor::dispatcher_impl::run(context_impl& context) {
    fit::subtle::scheduler::task_queue tasks;
    for (;;) {
        wait_for_runnable_tasks(&tasks);
        if (tasks.empty()) {
            return; // all done!
        }

        do {
            run_task(&tasks.front(), context);
            tasks.pop(); // the task may be destroyed here if it was not suspended
        } while (!tasks.empty());
    }
}

// Must only be called while |run_task()| is running a task.
// This happens when the task's continuation calls |context::suspend_task()|
// upon the context it received as an argument.
suspended_task single_threaded_executor::dispatcher_impl::suspend_current_task() {
    std::lock_guard<std::mutex> lock(guarded_.mutex_);
    assert(!guarded_.was_shutdown_);
    if (current_task_ticket_ == 0) {
        current_task_ticket_ = guarded_.scheduler_.obtain_ticket(
            2 /*initial_refs*/);
    } else {
        guarded_.scheduler_.duplicate_ticket(current_task_ticket_);
    }
    return suspended_task(this, current_task_ticket_);
}

// Unfortunately std::unique_lock does not support thread-safety annotations
void single_threaded_executor::dispatcher_impl::wait_for_runnable_tasks(
    fit::subtle::scheduler::task_queue* out_tasks) FIT_NO_THREAD_SAFETY_ANALYSIS {
    std::unique_lock<std::mutex> lock(guarded_.mutex_);
    for (;;) {
        assert(!guarded_.was_shutdown_);
        guarded_.scheduler_.take_runnable_tasks(out_tasks);
        if (!out_tasks->empty()) {
            return; // got some tasks
        }
        if (!guarded_.scheduler_.has_suspended_tasks()) {
            return; // all done!
        }
        guarded_.need_wake_ = true;
        wake_.wait(lock);
        guarded_.need_wake_ = false;
    }
}

void single_threaded_executor::dispatcher_impl::run_task(pending_task* task,
                                                         context& context) {
    assert(current_task_ticket_ == 0);
    const bool finished = (*task)(context);
    assert(!*task == finished);
    (void)finished;
    if (current_task_ticket_ == 0) {
        return; // task was not suspended, no ticket was produced
    }

    std::lock_guard<std::mutex> lock(guarded_.mutex_);
    assert(!guarded_.was_shutdown_);
    guarded_.scheduler_.finalize_ticket(current_task_ticket_, task);
    current_task_ticket_ = 0;
}

suspended_task::ticket single_threaded_executor::dispatcher_impl::duplicate_ticket(
    suspended_task::ticket ticket) {
    std::lock_guard<std::mutex> lock(guarded_.mutex_);
    guarded_.scheduler_.duplicate_ticket(ticket);
    return ticket;
}

void single_threaded_executor::dispatcher_impl::resolve_ticket(
    suspended_task::ticket ticket, bool resume_task) {
    pending_task abandoned_task; // drop outside of the lock
    bool do_wake = false;
    {
        std::lock_guard<std::mutex> lock(guarded_.mutex_);
        if (resume_task) {
            guarded_.scheduler_.resume_task_with_ticket(ticket);
        } else {
            abandoned_task = guarded_.scheduler_.release_ticket(ticket);
        }
        if (guarded_.was_shutdown_) {
            assert(!guarded_.need_wake_);
            if (guarded_.scheduler_.has_outstanding_tickets()) {
                return; // can't shutdown yet
            }
        } else if (guarded_.need_wake_ &&
                   (guarded_.scheduler_.has_runnable_tasks() ||
                    !guarded_.scheduler_.has_suspended_tasks())) {
            guarded_.need_wake_ = false;
            do_wake = true;
        } else {
            return; // nothing else to do
        }
    }

    // Must do this outside of the lock.
    if (do_wake) {
        wake_.notify_one();
    } else {
        delete this;
    }
}

} // namespace fit

#endif // FIT_NO_STD_FOR_ZIRCON_USERSPACE