//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===// // // The LLVM Compiler Infrastructure // // This file is distributed under the University of Illinois Open Source // License. See LICENSE.TXT for details. // //===----------------------------------------------------------------------===// // // This file defines a crude C++11 based task queue. // //===----------------------------------------------------------------------===// #ifndef LLVM_SUPPORT_TASK_QUEUE_H #define LLVM_SUPPORT_TASK_QUEUE_H #include "llvm/Config/llvm-config.h" #include "llvm/Support/ThreadPool.h" #include "llvm/Support/thread.h" #include <atomic> #include <cassert> #include <condition_variable> #include <deque> #include <functional> #include <future> #include <memory> #include <mutex> #include <utility> namespace llvm { /// TaskQueue executes serialized work on a user-defined Thread Pool. It /// guarantees that if task B is enqueued after task A, task B begins after /// task A completes and there is no overlap between the two. class TaskQueue { // Because we don't have init capture to use move-only local variables that // are captured into a lambda, we create the promise inside an explicit // callable struct. We want to do as much of the wrapping in the // type-specialized domain (before type erasure) and then erase this into a // std::function. template <typename Callable> struct Task { using ResultTy = typename std::result_of<Callable()>::type; explicit Task(Callable C, TaskQueue &Parent) : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()), Parent(&Parent) {} template<typename T> void invokeCallbackAndSetPromise(T*) { P->set_value(C()); } void invokeCallbackAndSetPromise(void*) { C(); P->set_value(); } void operator()() noexcept { ResultTy *Dummy = nullptr; invokeCallbackAndSetPromise(Dummy); Parent->completeTask(); } Callable C; std::shared_ptr<std::promise<ResultTy>> P; TaskQueue *Parent; }; public: /// Construct a task queue with no work. TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; } /// Blocking destructor: the queue will wait for all work to complete. ~TaskQueue() { Scheduler.wait(); assert(Tasks.empty()); } /// Asynchronous submission of a task to the queue. The returned future can be /// used to wait for the task (and all previous tasks that have not yet /// completed) to finish. template <typename Callable> std::future<typename std::result_of<Callable()>::type> async(Callable &&C) { #if !LLVM_ENABLE_THREADS static_assert(false, "TaskQueue requires building with LLVM_ENABLE_THREADS!"); #endif Task<Callable> T{std::move(C), *this}; using ResultTy = typename std::result_of<Callable()>::type; std::future<ResultTy> F = T.P->get_future(); { std::lock_guard<std::mutex> Lock(QueueLock); // If there's already a task in flight, just queue this one up. If // there is not a task in flight, bypass the queue and schedule this // task immediately. if (IsTaskInFlight) Tasks.push_back(std::move(T)); else { Scheduler.async(std::move(T)); IsTaskInFlight = true; } } return std::move(F); } private: void completeTask() { // We just completed a task. If there are no more tasks in the queue, // update IsTaskInFlight to false and stop doing work. Otherwise // schedule the next task (while not holding the lock). std::function<void()> Continuation; { std::lock_guard<std::mutex> Lock(QueueLock); if (Tasks.empty()) { IsTaskInFlight = false; return; } Continuation = std::move(Tasks.front()); Tasks.pop_front(); } Scheduler.async(std::move(Continuation)); } /// The thread pool on which to run the work. ThreadPool &Scheduler; /// State which indicates whether the queue currently is currently processing /// any work. bool IsTaskInFlight = false; /// Mutex for synchronizing access to the Tasks array. std::mutex QueueLock; /// Tasks waiting for execution in the queue. std::deque<std::function<void()>> Tasks; }; } // namespace llvm #endif // LLVM_SUPPORT_TASK_QUEUE_H