/* * Copyright (C) 2014 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include "task_processor.h" #include "base/time_utils.h" #include "common_runtime_test.h" #include "thread-current-inl.h" #include "thread_pool.h" namespace art { namespace gc { class TaskProcessorTest : public CommonRuntimeTest { public: }; class RecursiveTask : public HeapTask { public: RecursiveTask(TaskProcessor* task_processor, Atomic<size_t>* counter, size_t max_recursion) : HeapTask(NanoTime() + MsToNs(10)), task_processor_(task_processor), counter_(counter), max_recursion_(max_recursion) { } void Run(Thread* self) override { if (max_recursion_ > 0) { task_processor_->AddTask(self, new RecursiveTask(task_processor_, counter_, max_recursion_ - 1)); counter_->fetch_add(1U, std::memory_order_seq_cst); } } private: TaskProcessor* const task_processor_; Atomic<size_t>* const counter_; const size_t max_recursion_; }; class WorkUntilDoneTask : public SelfDeletingTask { public: WorkUntilDoneTask(TaskProcessor* task_processor, Atomic<bool>* done_running) : task_processor_(task_processor), done_running_(done_running) { } void Run(Thread* self) override { task_processor_->RunAllTasks(self); done_running_->store(true, std::memory_order_seq_cst); } private: TaskProcessor* const task_processor_; Atomic<bool>* done_running_; }; TEST_F(TaskProcessorTest, Interrupt) { ThreadPool thread_pool("task processor test", 1U); Thread* const self = Thread::Current(); TaskProcessor task_processor; static constexpr size_t kRecursion = 10; Atomic<bool> done_running(false); Atomic<size_t> counter(0); task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion)); task_processor.Start(self); // Add a task which will wait until interrupted to the thread pool. thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running)); thread_pool.StartWorkers(self); ASSERT_FALSE(done_running); // Wait until all the tasks are done, but since we didn't interrupt, done_running should be 0. while (counter.load(std::memory_order_seq_cst) != kRecursion) { usleep(10); } ASSERT_FALSE(done_running); task_processor.Stop(self); thread_pool.Wait(self, true, false); // After the interrupt and wait, the WorkUntilInterruptedTasktask should have terminated and // set done_running_ to true. ASSERT_TRUE(done_running.load(std::memory_order_seq_cst)); // Test that we finish remaining tasks before returning from RunTasksUntilInterrupted. counter.store(0, std::memory_order_seq_cst); done_running.store(false, std::memory_order_seq_cst); // Self interrupt before any of the other tasks run, but since we added them we should keep on // working until all the tasks are completed. task_processor.Stop(self); task_processor.AddTask(self, new RecursiveTask(&task_processor, &counter, kRecursion)); thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running)); thread_pool.StartWorkers(self); thread_pool.Wait(self, true, false); ASSERT_TRUE(done_running.load(std::memory_order_seq_cst)); ASSERT_EQ(counter.load(std::memory_order_seq_cst), kRecursion); } class TestOrderTask : public HeapTask { public: TestOrderTask(uint64_t expected_time, size_t expected_counter, size_t* counter) : HeapTask(expected_time), expected_counter_(expected_counter), counter_(counter) { } void Run(Thread* thread ATTRIBUTE_UNUSED) override { ASSERT_EQ(*counter_, expected_counter_); ++*counter_; } private: const size_t expected_counter_; size_t* const counter_; }; TEST_F(TaskProcessorTest, Ordering) { static const size_t kNumTasks = 25; const uint64_t current_time = NanoTime(); Thread* const self = Thread::Current(); TaskProcessor task_processor; task_processor.Stop(self); size_t counter = 0; std::vector<std::pair<uint64_t, size_t>> orderings; for (size_t i = 0; i < kNumTasks; ++i) { orderings.push_back(std::make_pair(current_time + MsToNs(10U * i), i)); } for (size_t i = 0; i < kNumTasks; ++i) { std::swap(orderings[i], orderings[(i * 87654231 + 12345) % orderings.size()]); } for (const auto& pair : orderings) { auto* task = new TestOrderTask(pair.first, pair.second, &counter); task_processor.AddTask(self, task); } ThreadPool thread_pool("task processor test", 1U); Atomic<bool> done_running(false); // Add a task which will wait until interrupted to the thread pool. thread_pool.AddTask(self, new WorkUntilDoneTask(&task_processor, &done_running)); ASSERT_FALSE(done_running.load(std::memory_order_seq_cst)); thread_pool.StartWorkers(self); thread_pool.Wait(self, true, false); ASSERT_TRUE(done_running.load(std::memory_order_seq_cst)); ASSERT_EQ(counter, kNumTasks); } } // namespace gc } // namespace art