static void s_event_thread_main()

in source/bsd/kqueue_event_loop.c [811:981]


static void s_event_thread_main(void *user_data) {
    struct aws_event_loop *event_loop = user_data;
    AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: main loop started", (void *)event_loop);
    struct kqueue_loop *impl = event_loop->impl_data;

    /* set thread id to the event-loop's thread. */
    aws_atomic_store_ptr(&impl->running_thread_id, &impl->thread_created_on.thread_id);

    AWS_ASSERT(impl->thread_data.state == EVENT_THREAD_STATE_READY_TO_RUN);
    impl->thread_data.state = EVENT_THREAD_STATE_RUNNING;

    struct kevent kevents[MAX_EVENTS];

    /* A single aws_io_handle could have two separate kevents if subscribed for both read and write.
     * If both the read and write kevents fire in the same loop of the event-thread,
     * combine the event-flags and deliver them in a single callback.
     * This makes the kqueue_event_loop behave more like the other platform implementations. */
    struct handle_data *io_handle_events[MAX_EVENTS];

    struct timespec timeout = {
        .tv_sec = DEFAULT_TIMEOUT_SEC,
        .tv_nsec = 0,
    };

    AWS_LOGF_INFO(
        AWS_LS_IO_EVENT_LOOP,
        "id=%p: default timeout %ds, and max events to process per tick %d",
        (void *)event_loop,
        DEFAULT_TIMEOUT_SEC,
        MAX_EVENTS);

    while (impl->thread_data.state == EVENT_THREAD_STATE_RUNNING) {
        int num_io_handle_events = 0;
        bool should_process_cross_thread_data = false;

        AWS_LOGF_TRACE(
            AWS_LS_IO_EVENT_LOOP,
            "id=%p: waiting for a maximum of %ds %lluns",
            (void *)event_loop,
            (int)timeout.tv_sec,
            (unsigned long long)timeout.tv_nsec);

        /* Process kqueue events */
        int num_kevents = kevent(
            impl->kq_fd, NULL /*changelist*/, 0 /*nchanges*/, kevents /*eventlist*/, MAX_EVENTS /*nevents*/, &timeout);

        aws_event_loop_register_tick_start(event_loop);
        AWS_LOGF_TRACE(
            AWS_LS_IO_EVENT_LOOP, "id=%p: wake up with %d events to process.", (void *)event_loop, num_kevents);
        if (num_kevents == -1) {
            /* Raise an error, in case this is interesting to anyone monitoring,
             * and continue on with this loop. We can't process events,
             * but we can still process scheduled tasks */
            aws_raise_error(AWS_ERROR_SYS_CALL_FAILURE);

            /* Force the cross_thread_data to be processed.
             * There might be valuable info in there, like the message to stop the thread.
             * It's fine to do this even if nothing has changed, it just costs a mutex lock/unlock. */
            should_process_cross_thread_data = true;
        }

        for (int i = 0; i < num_kevents; ++i) {
            struct kevent *kevent = &kevents[i];

            /* Was this event to signal that cross_thread_data has changed? */
            if ((int)kevent->ident == impl->cross_thread_signal_pipe[READ_FD]) {
                should_process_cross_thread_data = true;

                /* Drain whatever data was written to the signaling pipe */
                uint32_t read_whatever;
                while (read((int)kevent->ident, &read_whatever, sizeof(read_whatever)) > 0) {
                }

                continue;
            }

            /* Otherwise this was a normal event on a subscribed handle. Figure out which flags to report. */
            int event_flags = s_aws_event_flags_from_kevent(kevent);
            if (event_flags == 0) {
                continue;
            }

            /* Combine flags, in case multiple kevents correspond to one handle. (see notes at top of function) */
            struct handle_data *handle_data = kevent->udata;
            if (handle_data->events_this_loop == 0) {
                io_handle_events[num_io_handle_events++] = handle_data;
            }
            handle_data->events_this_loop |= event_flags;
        }

        /* Invoke each handle's event callback (unless the handle has been unsubscribed) */
        for (int i = 0; i < num_io_handle_events; ++i) {
            struct handle_data *handle_data = io_handle_events[i];

            if (handle_data->state == HANDLE_STATE_SUBSCRIBED) {
                AWS_LOGF_TRACE(
                    AWS_LS_IO_EVENT_LOOP,
                    "id=%p: activity on fd %d, invoking handler.",
                    (void *)event_loop,
                    handle_data->owner->data.fd);
                handle_data->on_event(
                    event_loop, handle_data->owner, handle_data->events_this_loop, handle_data->on_event_user_data);
            }

            handle_data->events_this_loop = 0;
        }

        /* Process cross_thread_data */
        if (should_process_cross_thread_data) {
            s_process_cross_thread_data(event_loop);
        }

        /* Run scheduled tasks */
        uint64_t now_ns = 0;
        event_loop->clock(&now_ns); /* If clock fails, now_ns will be 0 and tasks scheduled for a specific time
                                       will not be run. That's ok, we'll handle them next time around. */
        AWS_LOGF_TRACE(AWS_LS_IO_EVENT_LOOP, "id=%p: running scheduled tasks.", (void *)event_loop);
        aws_task_scheduler_run_all(&impl->thread_data.scheduler, now_ns);

        /* Set timeout for next kevent() call.
         * If clock fails, or scheduler has no tasks, use default timeout */
        bool use_default_timeout = false;

        int err = event_loop->clock(&now_ns);
        if (err) {
            use_default_timeout = true;
        }

        uint64_t next_run_time_ns;
        if (!aws_task_scheduler_has_tasks(&impl->thread_data.scheduler, &next_run_time_ns)) {

            use_default_timeout = true;
        }

        if (use_default_timeout) {
            AWS_LOGF_TRACE(
                AWS_LS_IO_EVENT_LOOP, "id=%p: no more scheduled tasks using default timeout.", (void *)event_loop);
            timeout.tv_sec = DEFAULT_TIMEOUT_SEC;
            timeout.tv_nsec = 0;
        } else {
            /* Convert from timestamp in nanoseconds, to timeout in seconds with nanosecond remainder */
            uint64_t timeout_ns = next_run_time_ns > now_ns ? next_run_time_ns - now_ns : 0;

            uint64_t timeout_remainder_ns = 0;
            uint64_t timeout_sec =
                aws_timestamp_convert(timeout_ns, AWS_TIMESTAMP_NANOS, AWS_TIMESTAMP_SECS, &timeout_remainder_ns);

            if (timeout_sec > LONG_MAX) { /* Check for overflow. On Darwin, these values are stored as longs */
                timeout_sec = LONG_MAX;
                timeout_remainder_ns = 0;
            }

            AWS_LOGF_TRACE(
                AWS_LS_IO_EVENT_LOOP,
                "id=%p: detected more scheduled tasks with the next occurring at "
                "%llu using timeout of %ds %lluns.",
                (void *)event_loop,
                (unsigned long long)timeout_ns,
                (int)timeout_sec,
                (unsigned long long)timeout_remainder_ns);
            timeout.tv_sec = (time_t)(timeout_sec);
            timeout.tv_nsec = (long)(timeout_remainder_ns);
        }

        aws_event_loop_register_tick_end(event_loop);
    }

    AWS_LOGF_INFO(AWS_LS_IO_EVENT_LOOP, "id=%p: exiting main loop", (void *)event_loop);
    /* reset to NULL. This should be updated again during destroy before tasks are canceled. */
    aws_atomic_store_ptr(&impl->running_thread_id, NULL);
}