C++程序  |  139行  |  4.28 KB

//===-- llvm/Support/TaskQueue.h - A TaskQueue implementation ---*- C++ -*-===//
//
// Part of the LLVM Project, under the Apache License v2.0 with LLVM Exceptions.
// See https://llvm.org/LICENSE.txt for license information.
// SPDX-License-Identifier: Apache-2.0 WITH LLVM-exception
//
//===----------------------------------------------------------------------===//
//
// This file defines a crude C++11 based task queue.
//
//===----------------------------------------------------------------------===//

#ifndef LLVM_SUPPORT_TASK_QUEUE_H
#define LLVM_SUPPORT_TASK_QUEUE_H

#include "llvm/Config/llvm-config.h"
#include "llvm/Support/ThreadPool.h"
#include "llvm/Support/thread.h"

#include <atomic>
#include <cassert>
#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <memory>
#include <mutex>
#include <utility>

namespace llvm {
/// TaskQueue executes serialized work on a user-defined Thread Pool.  It
/// guarantees that if task B is enqueued after task A, task B begins after
/// task A completes and there is no overlap between the two.
class TaskQueue {
  // Because we don't have init capture to use move-only local variables that
  // are captured into a lambda, we create the promise inside an explicit
  // callable struct. We want to do as much of the wrapping in the
  // type-specialized domain (before type erasure) and then erase this into a
  // std::function.
  template <typename Callable> struct Task {
    using ResultTy = typename std::result_of<Callable()>::type;
    explicit Task(Callable C, TaskQueue &Parent)
        : C(std::move(C)), P(std::make_shared<std::promise<ResultTy>>()),
          Parent(&Parent) {}

    template<typename T>
    void invokeCallbackAndSetPromise(T*) {
      P->set_value(C());
    }

    void invokeCallbackAndSetPromise(void*) {
      C();
      P->set_value();
    }

    void operator()() noexcept {
      ResultTy *Dummy = nullptr;
      invokeCallbackAndSetPromise(Dummy);
      Parent->completeTask();
    }

    Callable C;
    std::shared_ptr<std::promise<ResultTy>> P;
    TaskQueue *Parent;
  };

public:
  /// Construct a task queue with no work.
  TaskQueue(ThreadPool &Scheduler) : Scheduler(Scheduler) { (void)Scheduler; }

  /// Blocking destructor: the queue will wait for all work to complete.
  ~TaskQueue() {
    Scheduler.wait();
    assert(Tasks.empty());
  }

  /// Asynchronous submission of a task to the queue. The returned future can be
  /// used to wait for the task (and all previous tasks that have not yet
  /// completed) to finish.
  template <typename Callable>
  std::future<typename std::result_of<Callable()>::type> async(Callable &&C) {
#if !LLVM_ENABLE_THREADS
    static_assert(false,
                  "TaskQueue requires building with LLVM_ENABLE_THREADS!");
#endif
    Task<Callable> T{std::move(C), *this};
    using ResultTy = typename std::result_of<Callable()>::type;
    std::future<ResultTy> F = T.P->get_future();
    {
      std::lock_guard<std::mutex> Lock(QueueLock);
      // If there's already a task in flight, just queue this one up.  If
      // there is not a task in flight, bypass the queue and schedule this
      // task immediately.
      if (IsTaskInFlight)
        Tasks.push_back(std::move(T));
      else {
        Scheduler.async(std::move(T));
        IsTaskInFlight = true;
      }
    }
    return std::move(F);
  }

private:
  void completeTask() {
    // We just completed a task.  If there are no more tasks in the queue,
    // update IsTaskInFlight to false and stop doing work.  Otherwise
    // schedule the next task (while not holding the lock).
    std::function<void()> Continuation;
    {
      std::lock_guard<std::mutex> Lock(QueueLock);
      if (Tasks.empty()) {
        IsTaskInFlight = false;
        return;
      }

      Continuation = std::move(Tasks.front());
      Tasks.pop_front();
    }
    Scheduler.async(std::move(Continuation));
  }

  /// The thread pool on which to run the work.
  ThreadPool &Scheduler;

  /// State which indicates whether the queue currently is currently processing
  /// any work.
  bool IsTaskInFlight = false;

  /// Mutex for synchronizing access to the Tasks array.
  std::mutex QueueLock;

  /// Tasks waiting for execution in the queue.
  std::deque<std::function<void()>> Tasks;
};
} // namespace llvm

#endif // LLVM_SUPPORT_TASK_QUEUE_H