in source/bsd/kqueue_event_loop.c [811:981]
static void s_event_thread_main(void *user_data) {
struct aws_event_loop *event_loop = user_data;
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
struct kqueue_loop *impl = event_loop->impl_data;
/* set thread id to the event-loop's thread. */
aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);
AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;
struct kevent kevents[MAX_EVENTS];
/* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
* If both the read and write kevents fire in the same loop of the event-thread,
* combine the event-flags and deliver them in a single callback.
* This makes the kqueue_event_loop behave more like the other platform implementations. */
struct handle_data *io_handle_events[MAX_EVENTS];
struct timespec timeout = {
.tv_sec = DEFAULT_TIMEOUT_SEC,
.tv_nsec = 0,
};
AWS_LOGF_INFO(
AWS_LS_IO_EVENT_LOOP,
"id=%p: default timeout %ds, and max events to process per tick %d",
(void *)event_loop,
DEFAULT_TIMEOUT_SEC,
MAX_EVENTS);
while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
int num_io_handle_events = 0;
bool should_process_cross_thread_data = false;
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"id=%p: waiting for a maximum of %ds %lluns",
(void *)event_loop,
(int)timeout.tv_sec,
(unsigned long long)timeout.tv_nsec);
/* Process kqueue events */
int num_kevents = kevent(
impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout);
aws_event_loop_register_tick_start(event_loop);
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
if (num_kevents == -1) {
/* Raise an error, in case this is interesting to anyone monitoring,
* and continue on with this loop. We can't process events,
* but we can still process scheduled tasks */
aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);
/* Force the cross_thread_data to be processed.
* There might be valuable info in there, like the message to stop the thread.
* It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
should_process_cross_thread_data = true;
}
for (int i = 0; i < num_kevents; ++i) {
struct kevent *kevent = &kevents[i];
/* Was this event to signal that cross_thread_data has changed? */
if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
should_process_cross_thread_data = true;
/* Drain whatever data was written to the signaling pipe */
uint32_t read_whatever;
while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
}
continue;
}
/* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
int event_flags = s_aws_event_flags_from_kevent(kevent);
if (event_flags == 0) {
continue;
}
/* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
struct handle_data *handle_data = kevent->udata;
if (handle_data->events_this_loop == 0) {
io_handle_events[num_io_handle_events++] = handle_data;
}
handle_data->events_this_loop |= event_flags;
}
/* Invoke each handle's event callback (unless the handle has been unsubscribed) */
for (int i = 0; i < num_io_handle_events; ++i) {
struct handle_data *handle_data = io_handle_events[i];
if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"id=%p: activity on fd %d, invoking handler.",
(void *)event_loop,
handle_data->owner->data.fd);
handle_data->on_event(
event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
}
handle_data->events_this_loop = 0;
}
/* Process cross_thread_data */
if (should_process_cross_thread_data) {
s_process_cross_thread_data(event_loop);
}
/* Run scheduled tasks */
uint64_t now_ns = 0;
event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
will not be run. That's ok, we'll handle them next time around. */
AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);
/* Set timeout for next kevent() call.
* If clock fails, or scheduler has no tasks, use default timeout */
bool use_default_timeout = false;
int err = event_loop->clock(&now_ns);
if (err) {
use_default_timeout = true;
}
uint64_t next_run_time_ns;
if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {
use_default_timeout = true;
}
if (use_default_timeout) {
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
timeout.tv_nsec = 0;
} else {
/* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;
uint64_t timeout_remainder_ns = 0;
uint64_t timeout_sec =
aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);
if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
timeout_sec = LONG_MAX;
timeout_remainder_ns = 0;
}
AWS_LOGF_TRACE(
AWS_LS_IO_EVENT_LOOP,
"id=%p: detected more scheduled tasks with the next occurring at "
"%llu using timeout of %ds %lluns.",
(void *)event_loop,
(unsigned long long)timeout_ns,
(int)timeout_sec,
(unsigned long long)timeout_remainder_ns);
timeout.tv_sec = (time_t)(timeout_sec);
timeout.tv_nsec = (long)(timeout_remainder_ns);
}
aws_event_loop_register_tick_end(event_loop);
}
AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
/* reset to NULL. This should be updated again during destroy before tasks are canceled. */
aws_atomic_store_ptr(&impl->running_thread_id, NULL);
}