C++程序  |  144行  |  4.99 KB

/*
* Copyright (c) 2017, The Linux Foundation. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*  * Redistributions of source code must retain the above copyright
*    notice, this list of conditions and the following disclaimer.
*  * Redistributions in binary form must reproduce the above
*    copyright notice, this list of conditions and the following
*    disclaimer in the documentation and/or other materials provided
*    with the distribution.
*  * Neither the name of The Linux Foundation nor the names of its
*    contributors may be used to endorse or promote products derived
*    from this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESS OR IMPLIED
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NON-INFRINGEMENT
* ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
* WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE
* OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
* IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/

#ifndef __SYNC_TASK_H__
#define __SYNC_TASK_H__

#include <thread>
#include <mutex>
#include <condition_variable>   // NOLINT

namespace sdm {

template <class TaskCode>
class SyncTask {
 public:
  // This class need to be overridden by caller to pass on a task context.
  class TaskContext {
   public:
    virtual ~TaskContext() { }
  };

  // Methods to callback into caller for command codes executions in worker thread.
  class TaskHandler {
   public:
    virtual ~TaskHandler() { }
    virtual void OnTask(const TaskCode &task_code, TaskContext *task_context) = 0;
  };

  explicit SyncTask(TaskHandler &task_handler) : task_handler_(task_handler) {
    // Block caller thread until worker thread has started and ready to listen to task commands.
    // Worker thread will signal as soon as callback is received in the new thread.
    std::unique_lock<std::mutex> caller_lock(caller_mutex_);
    std::thread worker_thread(SyncTaskThread, this);
    worker_thread_.swap(worker_thread);
    caller_cv_.wait(caller_lock);
  }

  ~SyncTask() {
    // Task code does not matter here.
    PerformTask(task_code_, nullptr, true);
    worker_thread_.join();
  }

  void PerformTask(const TaskCode &task_code, TaskContext *task_context) {
    PerformTask(task_code, task_context, false);
  }

 private:
  void PerformTask(const TaskCode &task_code, TaskContext *task_context, bool terminate) {
    std::unique_lock<std::mutex> caller_lock(caller_mutex_);

    // New scope to limit scope of worker lock to this block.
    {
      // Set task command code and notify worker thread.
      std::unique_lock<std::mutex> worker_lock(worker_mutex_);
      task_code_ = task_code;
      task_context_ = task_context;
      worker_thread_exit_ = terminate;
      pending_code_ = true;
      worker_cv_.notify_one();
    }

    // Wait for worker thread to finish and signal.
    caller_cv_.wait(caller_lock);
  }

  static void SyncTaskThread(SyncTask *sync_task) {
    if (sync_task) {
      sync_task->OnThreadCallback();
    }
  }

  void OnThreadCallback() {
    // Acquire worker lock and start waiting for events.
    // Wait must start before caller thread can post events, otherwise posted events will be lost.
    // Caller thread will be blocked until worker thread signals readiness.
    std::unique_lock<std::mutex> worker_lock(worker_mutex_);

    // New scope to limit scope of caller lock to this block.
    {
      // Signal caller thread that worker thread is ready to listen to events.
      std::unique_lock<std::mutex> caller_lock(caller_mutex_);
      caller_cv_.notify_one();
    }

    while (!worker_thread_exit_) {
      // Add predicate to handle spurious interrupts.
      // Wait for caller thread to signal new command codes.
      worker_cv_.wait(worker_lock, [this] { return pending_code_; });

      // Call task handler which is implemented by the caller.
      if (!worker_thread_exit_) {
        task_handler_.OnTask(task_code_, task_context_);
      }

      pending_code_ = false;
      // Notify completion of current task to the caller thread which is blocked.
      std::unique_lock<std::mutex> caller_lock(caller_mutex_);
      caller_cv_.notify_one();
    }
  }

  TaskHandler &task_handler_;
  TaskCode task_code_;
  TaskContext *task_context_ = nullptr;
  std::thread worker_thread_;
  std::mutex caller_mutex_;
  std::mutex worker_mutex_;
  std::condition_variable caller_cv_;
  std::condition_variable worker_cv_;
  bool worker_thread_exit_ = false;
  bool pending_code_ = false;
};

}  // namespace sdm

#endif  // __SYNC_TASK_H__