// Copyright 2017 The Fuchsia Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#ifndef _ALL_SOURCE
#define _ALL_SOURCE // Enables thrd_create_with_name in <threads.h>.
#endif
#include <lib/async-loop/loop.h>
#include <assert.h>
#include <stdatomic.h>
#include <stdlib.h>
#include <zircon/assert.h>
#include <zircon/listnode.h>
#include <zircon/syscalls.h>
#include <zircon/syscalls/hypervisor.h>
#include <lib/async/default.h>
#include <lib/async/exception.h>
#include <lib/async/receiver.h>
#include <lib/async/task.h>
#include <lib/async/trap.h>
#include <lib/async/wait.h>
// The port wait key associated with the dispatcher's control messages.
#define KEY_CONTROL (0u)
static zx_time_t async_loop_now(async_dispatcher_t* dispatcher);
static zx_status_t async_loop_begin_wait(async_dispatcher_t* dispatcher, async_wait_t* wait);
static zx_status_t async_loop_cancel_wait(async_dispatcher_t* dispatcher, async_wait_t* wait);
static zx_status_t async_loop_post_task(async_dispatcher_t* dispatcher, async_task_t* task);
static zx_status_t async_loop_cancel_task(async_dispatcher_t* dispatcher, async_task_t* task);
static zx_status_t async_loop_queue_packet(async_dispatcher_t* dispatcher, async_receiver_t* receiver,
const zx_packet_user_t* data);
static zx_status_t async_loop_set_guest_bell_trap(
async_dispatcher_t* dispatcher, async_guest_bell_trap_t* trap,
zx_handle_t guest, zx_vaddr_t addr, size_t length);
static zx_status_t async_loop_bind_exception_port(async_dispatcher_t* async,
async_exception_t* exception);
static zx_status_t async_loop_unbind_exception_port(async_dispatcher_t* async,
async_exception_t* exception);
static zx_status_t async_loop_resume_from_exception(async_dispatcher_t* async,
async_exception_t* exception,
zx_handle_t task,
uint32_t options);
static const async_ops_t async_loop_ops = {
.version = ASYNC_OPS_V2,
.reserved = 0,
.v1 = {
.now = async_loop_now,
.begin_wait = async_loop_begin_wait,
.cancel_wait = async_loop_cancel_wait,
.post_task = async_loop_post_task,
.cancel_task = async_loop_cancel_task,
.queue_packet = async_loop_queue_packet,
.set_guest_bell_trap = async_loop_set_guest_bell_trap,
},
.v2 = {
.bind_exception_port = async_loop_bind_exception_port,
.unbind_exception_port = async_loop_unbind_exception_port,
.resume_from_exception = async_loop_resume_from_exception,
},
};
typedef struct thread_record {
list_node_t node;
thrd_t thread;
} thread_record_t;
const async_loop_config_t kAsyncLoopConfigAttachToThread = {
.make_default_for_current_thread = true};
const async_loop_config_t kAsyncLoopConfigNoAttachToThread = {
.make_default_for_current_thread = false};
typedef struct async_loop {
async_dispatcher_t dispatcher; // must be first (the loop inherits from async_dispatcher_t)
async_loop_config_t config; // immutable
zx_handle_t port; // immutable
zx_handle_t timer; // immutable
_Atomic async_loop_state_t state;
atomic_uint active_threads; // number of active dispatch threads
mtx_t lock; // guards the lists and the dispatching tasks flag
bool dispatching_tasks; // true while the loop is busy dispatching tasks
list_node_t wait_list; // most recently added first
list_node_t task_list; // pending tasks, earliest deadline first
list_node_t due_list; // due tasks, earliest deadline first
list_node_t thread_list; // earliest created thread first
list_node_t exception_list; // most recently added first
} async_loop_t;
static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline);
static zx_status_t async_loop_dispatch_wait(async_loop_t* loop, async_wait_t* wait,
zx_status_t status, const zx_packet_signal_t* signal);
static zx_status_t async_loop_dispatch_tasks(async_loop_t* loop);
static void async_loop_dispatch_task(async_loop_t* loop, async_task_t* task,
zx_status_t status);
static zx_status_t async_loop_dispatch_packet(async_loop_t* loop, async_receiver_t* receiver,
zx_status_t status, const zx_packet_user_t* data);
static zx_status_t async_loop_dispatch_guest_bell_trap(async_loop_t* loop,
async_guest_bell_trap_t* trap,
zx_status_t status,
const zx_packet_guest_bell_t* bell);
static zx_status_t async_loop_dispatch_exception(async_loop_t* loop,
async_exception_t* exception,
zx_status_t status,
const zx_port_packet_t* report);
static void async_loop_wake_threads(async_loop_t* loop);
static void async_loop_insert_task_locked(async_loop_t* loop, async_task_t* task);
static void async_loop_restart_timer_locked(async_loop_t* loop);
static void async_loop_invoke_prologue(async_loop_t* loop);
static void async_loop_invoke_epilogue(async_loop_t* loop);
static_assert(sizeof(list_node_t) <= sizeof(async_state_t),
"async_state_t too small");
#define TO_NODE(type, ptr) ((list_node_t*)&ptr->state)
#define FROM_NODE(type, ptr) ((type*)((char*)(ptr)-offsetof(type, state)))
static inline list_node_t* wait_to_node(async_wait_t* wait) {
return TO_NODE(async_wait_t, wait);
}
static inline async_wait_t* node_to_wait(list_node_t* node) {
return FROM_NODE(async_wait_t, node);
}
static inline list_node_t* task_to_node(async_task_t* task) {
return TO_NODE(async_task_t, task);
}
static inline async_task_t* node_to_task(list_node_t* node) {
return FROM_NODE(async_task_t, node);
}
static inline list_node_t* exception_to_node(async_exception_t* exception) {
return TO_NODE(async_exception_t, exception);
}
static inline async_exception_t* node_to_exception(list_node_t* node) {
return FROM_NODE(async_exception_t, node);
}
zx_status_t async_loop_create(const async_loop_config_t* config, async_loop_t** out_loop) {
ZX_DEBUG_ASSERT(out_loop);
ZX_DEBUG_ASSERT(config != NULL);
async_loop_t* loop = calloc(1u, sizeof(async_loop_t));
if (!loop)
return ZX_ERR_NO_MEMORY;
atomic_init(&loop->state, ASYNC_LOOP_RUNNABLE);
atomic_init(&loop->active_threads, 0u);
loop->dispatcher.ops = &async_loop_ops;
loop->config = *config;
mtx_init(&loop->lock, mtx_plain);
list_initialize(&loop->wait_list);
list_initialize(&loop->task_list);
list_initialize(&loop->due_list);
list_initialize(&loop->thread_list);
list_initialize(&loop->exception_list);
zx_status_t status = zx_port_create(0u, &loop->port);
if (status == ZX_OK)
status = zx_timer_create(0u, ZX_CLOCK_MONOTONIC, &loop->timer);
if (status == ZX_OK) {
status = zx_object_wait_async(loop->timer, loop->port, KEY_CONTROL,
ZX_TIMER_SIGNALED,
ZX_WAIT_ASYNC_REPEATING);
}
if (status == ZX_OK) {
*out_loop = loop;
if (loop->config.make_default_for_current_thread) {
ZX_DEBUG_ASSERT(async_get_default_dispatcher() == NULL);
async_set_default_dispatcher(&loop->dispatcher);
}
} else {
loop->config.make_default_for_current_thread = false;
async_loop_destroy(loop);
}
return status;
}
void async_loop_destroy(async_loop_t* loop) {
ZX_DEBUG_ASSERT(loop);
async_loop_shutdown(loop);
zx_handle_close(loop->port);
zx_handle_close(loop->timer);
mtx_destroy(&loop->lock);
free(loop);
}
void async_loop_shutdown(async_loop_t* loop) {
ZX_DEBUG_ASSERT(loop);
async_loop_state_t prior_state =
atomic_exchange_explicit(&loop->state, ASYNC_LOOP_SHUTDOWN,
memory_order_acq_rel);
if (prior_state == ASYNC_LOOP_SHUTDOWN)
return;
async_loop_wake_threads(loop);
async_loop_join_threads(loop);
list_node_t* node;
while ((node = list_remove_head(&loop->wait_list))) {
async_wait_t* wait = node_to_wait(node);
async_loop_dispatch_wait(loop, wait, ZX_ERR_CANCELED, NULL);
}
while ((node = list_remove_head(&loop->due_list))) {
async_task_t* task = node_to_task(node);
async_loop_dispatch_task(loop, task, ZX_ERR_CANCELED);
}
while ((node = list_remove_head(&loop->task_list))) {
async_task_t* task = node_to_task(node);
async_loop_dispatch_task(loop, task, ZX_ERR_CANCELED);
}
while ((node = list_remove_head(&loop->exception_list))) {
async_exception_t* exception = node_to_exception(node);
async_loop_dispatch_exception(loop, exception, ZX_ERR_CANCELED, NULL);
}
if (loop->config.make_default_for_current_thread) {
ZX_DEBUG_ASSERT(async_get_default_dispatcher() == &loop->dispatcher);
async_set_default_dispatcher(NULL);
}
}
zx_status_t async_loop_run(async_loop_t* loop, zx_time_t deadline, bool once) {
ZX_DEBUG_ASSERT(loop);
zx_status_t status;
atomic_fetch_add_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
do {
status = async_loop_run_once(loop, deadline);
} while (status == ZX_OK && !once);
atomic_fetch_sub_explicit(&loop->active_threads, 1u, memory_order_acq_rel);
return status;
}
zx_status_t async_loop_run_until_idle(async_loop_t* loop) {
zx_status_t status = async_loop_run(loop, 0, false);
if (status == ZX_ERR_TIMED_OUT) {
status = ZX_OK;
}
return status;
}
static zx_status_t async_loop_run_once(async_loop_t* loop, zx_time_t deadline) {
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
if (state == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
if (state != ASYNC_LOOP_RUNNABLE)
return ZX_ERR_CANCELED;
zx_port_packet_t packet;
zx_status_t status = zx_port_wait(loop->port, deadline, &packet);
if (status != ZX_OK)
return status;
if (packet.key == KEY_CONTROL) {
// Handle wake-up packets.
if (packet.type == ZX_PKT_TYPE_USER)
return ZX_OK;
// Handle task timer expirations.
if (packet.type == ZX_PKT_TYPE_SIGNAL_REP &&
packet.signal.observed & ZX_TIMER_SIGNALED) {
return async_loop_dispatch_tasks(loop);
}
} else {
// Handle wait completion packets.
if (packet.type == ZX_PKT_TYPE_SIGNAL_ONE) {
async_wait_t* wait = (void*)(uintptr_t)packet.key;
mtx_lock(&loop->lock);
list_delete(wait_to_node(wait));
mtx_unlock(&loop->lock);
return async_loop_dispatch_wait(loop, wait, packet.status, &packet.signal);
}
// Handle queued user packets.
if (packet.type == ZX_PKT_TYPE_USER) {
async_receiver_t* receiver = (void*)(uintptr_t)packet.key;
return async_loop_dispatch_packet(loop, receiver, packet.status, &packet.user);
}
// Handle guest bell trap packets.
if (packet.type == ZX_PKT_TYPE_GUEST_BELL) {
async_guest_bell_trap_t* trap = (void*)(uintptr_t)packet.key;
return async_loop_dispatch_guest_bell_trap(
loop, trap, packet.status, &packet.guest_bell);
}
// Handle exception packets.
if (ZX_PKT_IS_EXCEPTION(packet.type)) {
async_exception_t* exception = (void*)(uintptr_t)packet.key;
return async_loop_dispatch_exception(loop, exception, packet.status,
&packet);
}
}
ZX_DEBUG_ASSERT(false);
return ZX_ERR_INTERNAL;
}
async_dispatcher_t* async_loop_get_dispatcher(async_loop_t* loop) {
// Note: The loop's implementation inherits from async_t so we can upcast to it.
return (async_dispatcher_t*)loop;
}
async_loop_t* async_loop_from_dispatcher(async_dispatcher_t* async) {
return (async_loop_t*)async;
}
static zx_status_t async_loop_dispatch_guest_bell_trap(async_loop_t* loop,
async_guest_bell_trap_t* trap,
zx_status_t status,
const zx_packet_guest_bell_t* bell) {
async_loop_invoke_prologue(loop);
trap->handler((async_dispatcher_t*)loop, trap, status, bell);
async_loop_invoke_epilogue(loop);
return ZX_OK;
}
static zx_status_t async_loop_dispatch_wait(async_loop_t* loop, async_wait_t* wait,
zx_status_t status, const zx_packet_signal_t* signal) {
async_loop_invoke_prologue(loop);
wait->handler((async_dispatcher_t*)loop, wait, status, signal);
async_loop_invoke_epilogue(loop);
return ZX_OK;
}
static zx_status_t async_loop_dispatch_tasks(async_loop_t* loop) {
// Dequeue and dispatch one task at a time in case an earlier task wants
// to cancel a later task which has also come due. At most one thread
// can dispatch tasks at any given moment (to preserve serial ordering).
// Timer restarts are suppressed until we run out of tasks to dispatch.
mtx_lock(&loop->lock);
if (!loop->dispatching_tasks) {
loop->dispatching_tasks = true;
// Extract all of the tasks that are due into |due_list| for dispatch
// unless we already have some waiting from a previous iteration which
// we would like to process in order.
list_node_t* node;
if (list_is_empty(&loop->due_list)) {
zx_time_t due_time = async_loop_now((async_dispatcher_t*)loop);
list_node_t* tail = NULL;
list_for_every(&loop->task_list, node) {
if (node_to_task(node)->deadline > due_time)
break;
tail = node;
}
if (tail) {
list_node_t* head = loop->task_list.next;
loop->task_list.next = tail->next;
tail->next->prev = &loop->task_list;
loop->due_list.next = head;
head->prev = &loop->due_list;
loop->due_list.prev = tail;
tail->next = &loop->due_list;
}
}
// Dispatch all due tasks. Note that they might be canceled concurrently
// so we need to grab the lock during each iteration to fetch the next
// item from the list.
while ((node = list_remove_head(&loop->due_list))) {
mtx_unlock(&loop->lock);
// Invoke the handler. Note that it might destroy itself.
async_task_t* task = node_to_task(node);
async_loop_dispatch_task(loop, task, ZX_OK);
mtx_lock(&loop->lock);
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
if (state != ASYNC_LOOP_RUNNABLE)
break;
}
loop->dispatching_tasks = false;
async_loop_restart_timer_locked(loop);
}
mtx_unlock(&loop->lock);
return ZX_OK;
}
static void async_loop_dispatch_task(async_loop_t* loop,
async_task_t* task,
zx_status_t status) {
// Invoke the handler. Note that it might destroy itself.
async_loop_invoke_prologue(loop);
task->handler((async_dispatcher_t*)loop, task, status);
async_loop_invoke_epilogue(loop);
}
static zx_status_t async_loop_dispatch_packet(async_loop_t* loop, async_receiver_t* receiver,
zx_status_t status, const zx_packet_user_t* data) {
// Invoke the handler. Note that it might destroy itself.
async_loop_invoke_prologue(loop);
receiver->handler((async_dispatcher_t*)loop, receiver, status, data);
async_loop_invoke_epilogue(loop);
return ZX_OK;
}
static zx_status_t async_loop_dispatch_exception(async_loop_t* loop,
async_exception_t* exception,
zx_status_t status,
const zx_port_packet_t* report) {
// Invoke the handler. Note that it might destroy itself.
async_loop_invoke_prologue(loop);
exception->handler((async_dispatcher_t*)loop, exception, status, report);
async_loop_invoke_epilogue(loop);
return ZX_OK;
}
void async_loop_quit(async_loop_t* loop) {
ZX_DEBUG_ASSERT(loop);
async_loop_state_t expected_state = ASYNC_LOOP_RUNNABLE;
if (!atomic_compare_exchange_strong_explicit(&loop->state, &expected_state,
ASYNC_LOOP_QUIT,
memory_order_acq_rel, memory_order_acquire))
return;
async_loop_wake_threads(loop);
}
static void async_loop_wake_threads(async_loop_t* loop) {
// Queue enough packets to awaken all active threads.
// This is safe because any new threads which join the pool first increment the
// active thread count then check the loop state, so the count we observe here
// cannot be less than the number of threads which might be blocked in |port_wait|.
// Issuing too many packets is also harmless.
uint32_t n = atomic_load_explicit(&loop->active_threads, memory_order_acquire);
for (uint32_t i = 0u; i < n; i++) {
zx_port_packet_t packet = {
.key = KEY_CONTROL,
.type = ZX_PKT_TYPE_USER,
.status = ZX_OK};
zx_status_t status = zx_port_queue(loop->port, &packet);
ZX_ASSERT_MSG(status == ZX_OK, "zx_port_queue: status=%d", status);
}
}
zx_status_t async_loop_reset_quit(async_loop_t* loop) {
ZX_DEBUG_ASSERT(loop);
// Ensure that there are no active threads before resetting the quit state.
// This check is inherently racy but not dangerously so. It's mainly a
// sanity check for client code so we can make a stronger statement about
// how |async_loop_reset_quit()| is supposed to be used.
uint32_t n = atomic_load_explicit(&loop->active_threads, memory_order_acquire);
if (n != 0)
return ZX_ERR_BAD_STATE;
async_loop_state_t expected_state = ASYNC_LOOP_QUIT;
if (atomic_compare_exchange_strong_explicit(&loop->state, &expected_state,
ASYNC_LOOP_RUNNABLE,
memory_order_acq_rel, memory_order_acquire)) {
return ZX_OK;
}
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
if (state == ASYNC_LOOP_RUNNABLE)
return ZX_OK;
return ZX_ERR_BAD_STATE;
}
async_loop_state_t async_loop_get_state(async_loop_t* loop) {
ZX_DEBUG_ASSERT(loop);
return atomic_load_explicit(&loop->state, memory_order_acquire);
}
zx_time_t async_loop_now(async_dispatcher_t* dispatcher) {
return zx_clock_get_monotonic();
}
static zx_status_t async_loop_begin_wait(async_dispatcher_t* async, async_wait_t* wait) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(wait);
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
mtx_lock(&loop->lock);
zx_status_t status = zx_object_wait_async(
wait->object, loop->port, (uintptr_t)wait, wait->trigger, ZX_WAIT_ASYNC_ONCE);
if (status == ZX_OK) {
list_add_head(&loop->wait_list, wait_to_node(wait));
} else {
ZX_ASSERT_MSG(status == ZX_ERR_ACCESS_DENIED,
"zx_object_wait_async: status=%d", status);
}
mtx_unlock(&loop->lock);
return status;
}
static zx_status_t async_loop_cancel_wait(async_dispatcher_t* async, async_wait_t* wait) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(wait);
// Note: We need to process cancelations even while the loop is being
// destroyed in case the client is counting on the handler not being
// invoked again past this point.
mtx_lock(&loop->lock);
// First, confirm that the wait is actually pending.
list_node_t* node = wait_to_node(wait);
if (!list_in_list(node)) {
mtx_unlock(&loop->lock);
return ZX_ERR_NOT_FOUND;
}
// Next, cancel the wait. This may be racing with another thread that
// has read the wait's packet but not yet dispatched it. So if we fail
// to cancel then we assume we lost the race.
zx_status_t status = zx_port_cancel(loop->port, wait->object,
(uintptr_t)wait);
if (status == ZX_OK) {
list_delete(node);
} else {
ZX_ASSERT_MSG(status == ZX_ERR_NOT_FOUND,
"zx_port_cancel: status=%d", status);
}
mtx_unlock(&loop->lock);
return status;
}
static zx_status_t async_loop_post_task(async_dispatcher_t* async, async_task_t* task) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(task);
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
mtx_lock(&loop->lock);
async_loop_insert_task_locked(loop, task);
if (!loop->dispatching_tasks &&
task_to_node(task)->prev == &loop->task_list) {
// Task inserted at head. Earliest deadline changed.
async_loop_restart_timer_locked(loop);
}
mtx_unlock(&loop->lock);
return ZX_OK;
}
static zx_status_t async_loop_cancel_task(async_dispatcher_t* async, async_task_t* task) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(task);
// Note: We need to process cancelations even while the loop is being
// destroyed in case the client is counting on the handler not being
// invoked again past this point. Also, the task we're removing here
// might be present in the dispatcher's |due_list| if it is pending
// dispatch instead of in the loop's |task_list| as usual. The same
// logic works in both cases.
mtx_lock(&loop->lock);
list_node_t* node = task_to_node(task);
if (!list_in_list(node)) {
mtx_unlock(&loop->lock);
return ZX_ERR_NOT_FOUND;
}
// Determine whether the head task was canceled and following task has
// a later deadline. If so, we will bump the timer along to that deadline.
bool must_restart = !loop->dispatching_tasks &&
node->prev == &loop->task_list &&
node->next != &loop->task_list &&
node_to_task(node->next)->deadline > task->deadline;
list_delete(node);
if (must_restart)
async_loop_restart_timer_locked(loop);
mtx_unlock(&loop->lock);
return ZX_OK;
}
static zx_status_t async_loop_queue_packet(async_dispatcher_t* async, async_receiver_t* receiver,
const zx_packet_user_t* data) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(receiver);
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
zx_port_packet_t packet = {
.key = (uintptr_t)receiver,
.type = ZX_PKT_TYPE_USER,
.status = ZX_OK};
if (data)
packet.user = *data;
return zx_port_queue(loop->port, &packet);
}
static zx_status_t async_loop_set_guest_bell_trap(
async_dispatcher_t* async, async_guest_bell_trap_t* trap,
zx_handle_t guest, zx_vaddr_t addr, size_t length) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(trap);
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
zx_status_t status = zx_guest_set_trap(guest, ZX_GUEST_TRAP_BELL, addr,
length, loop->port, (uintptr_t)trap);
if (status != ZX_OK) {
ZX_ASSERT_MSG(status == ZX_ERR_ACCESS_DENIED ||
status == ZX_ERR_ALREADY_EXISTS ||
status == ZX_ERR_INVALID_ARGS ||
status == ZX_ERR_OUT_OF_RANGE ||
status == ZX_ERR_WRONG_TYPE,
"zx_guest_set_trap: status=%d", status);
}
return status;
}
static zx_status_t async_loop_bind_exception_port(async_dispatcher_t* async,
async_exception_t* exception) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(exception);
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
mtx_lock(&loop->lock);
uint64_t key = (uintptr_t)(void*) exception;
zx_status_t status = zx_task_bind_exception_port(exception->task, loop->port,
key, exception->options);
if (status == ZX_OK) {
list_add_head(&loop->exception_list, exception_to_node(exception));
}
mtx_unlock(&loop->lock);
return status;
}
static zx_status_t async_loop_unbind_exception_port(async_dispatcher_t* async,
async_exception_t* exception) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(exception);
// Note: We need to process unbindings even while the loop is being
// destroyed in case the client is counting on the handler not being
// invoked again past this point.
mtx_lock(&loop->lock);
// First, confirm that the port is actually bound.
list_node_t* node = exception_to_node(exception);
if (!list_in_list(node)) {
mtx_unlock(&loop->lock);
return ZX_ERR_NOT_FOUND;
}
uint64_t key = (uintptr_t)(void*) exception;
zx_status_t status = zx_task_bind_exception_port(exception->task,
ZX_HANDLE_INVALID, key, 0);
if (status == ZX_OK) {
list_delete(node);
}
mtx_unlock(&loop->lock);
return status;
}
static zx_status_t async_loop_resume_from_exception(async_dispatcher_t* async,
async_exception_t* exception,
zx_handle_t task,
uint32_t options) {
async_loop_t* loop = (async_loop_t*)async;
ZX_DEBUG_ASSERT(loop);
ZX_DEBUG_ASSERT(exception);
if (atomic_load_explicit(&loop->state, memory_order_acquire) == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
return zx_task_resume_from_exception(task, loop->port, options);
}
static void async_loop_insert_task_locked(async_loop_t* loop, async_task_t* task) {
// TODO(ZX-976): We assume that tasks are inserted in quasi-monotonic order and
// that insertion into the task queue will typically take no more than a few steps.
// If this assumption proves false and the cost of insertion becomes a problem, we
// should consider using a more efficient representation for maintaining order.
list_node_t* node;
for (node = loop->task_list.prev; node != &loop->task_list; node = node->prev) {
if (task->deadline >= node_to_task(node)->deadline)
break;
}
list_add_after(node, task_to_node(task));
}
static void async_loop_restart_timer_locked(async_loop_t* loop) {
zx_time_t deadline;
if (list_is_empty(&loop->due_list)) {
list_node_t* head = list_peek_head(&loop->task_list);
if (!head)
return;
async_task_t* task = node_to_task(head);
deadline = task->deadline;
if (deadline == ZX_TIME_INFINITE)
return;
} else {
// Fire now.
deadline = 0ULL;
}
zx_status_t status = zx_timer_set(loop->timer, deadline, 0);
ZX_ASSERT_MSG(status == ZX_OK, "zx_timer_set: status=%d", status);
}
static void async_loop_invoke_prologue(async_loop_t* loop) {
if (loop->config.prologue)
loop->config.prologue(loop, loop->config.data);
}
static void async_loop_invoke_epilogue(async_loop_t* loop) {
if (loop->config.epilogue)
loop->config.epilogue(loop, loop->config.data);
}
static int async_loop_run_thread(void* data) {
async_loop_t* loop = (async_loop_t*)data;
async_set_default_dispatcher(&loop->dispatcher);
async_loop_run(loop, ZX_TIME_INFINITE, false);
return 0;
}
zx_status_t async_loop_start_thread(async_loop_t* loop, const char* name, thrd_t* out_thread) {
ZX_DEBUG_ASSERT(loop);
// This check is inherently racy. The client should not be racing shutdown
// with attemps to start new threads. This is mainly a sanity check.
async_loop_state_t state = atomic_load_explicit(&loop->state, memory_order_acquire);
if (state == ASYNC_LOOP_SHUTDOWN)
return ZX_ERR_BAD_STATE;
thread_record_t* rec = calloc(1u, sizeof(thread_record_t));
if (!rec)
return ZX_ERR_NO_MEMORY;
if (thrd_create_with_name(&rec->thread, async_loop_run_thread, loop, name) != thrd_success) {
free(rec);
return ZX_ERR_NO_MEMORY;
}
mtx_lock(&loop->lock);
list_add_tail(&loop->thread_list, &rec->node);
mtx_unlock(&loop->lock);
if (out_thread)
*out_thread = rec->thread;
return ZX_OK;
}
void async_loop_join_threads(async_loop_t* loop) {
ZX_DEBUG_ASSERT(loop);
mtx_lock(&loop->lock);
for (;;) {
thread_record_t* rec = (thread_record_t*)list_remove_head(&loop->thread_list);
if (!rec)
break;
mtx_unlock(&loop->lock);
thrd_t thread = rec->thread;
free(rec);
int result = thrd_join(thread, NULL);
ZX_DEBUG_ASSERT(result == thrd_success);
mtx_lock(&loop->lock);
}
mtx_unlock(&loop->lock);
}