static void s_bus_async_init()

in source/bus.c [586:674]


static void s_bus_async_init(struct aws_bus *bus, const struct aws_bus_options *options) {
    struct bus_async_impl *impl = bus->impl = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_async_impl));
    impl->vtable = bus_async_vtable;
    impl->reliable = (options->policy == AWS_BUS_ASYNC_RELIABLE);

    /* init msg queue */
    if (aws_mutex_init(&impl->queue.mutex)) {
        AWS_LOGF_ERROR(
            AWS_LS_COMMON_BUS,
            "bus %p: Unable to initialize queue synchronization: %s",
            (void *)bus,
            aws_error_name(aws_last_error()));
        goto error;
    }
    aws_linked_list_init(&impl->queue.msgs);
    aws_linked_list_init(&impl->queue.free);
    aws_linked_list_init(&impl->queue.subs);

    /* push as many bus_messages as we can into the free list from the buffer */
    if (options->buffer_size) {
        impl->queue.buffer = aws_mem_calloc(bus->allocator, 1, options->buffer_size);
        impl->queue.buffer_end = ((uint8_t *)impl->queue.buffer) + options->buffer_size;
        const int msg_count = (int)(options->buffer_size / sizeof(struct bus_message));
        for (int msg_idx = 0; msg_idx < msg_count; ++msg_idx) {
            struct bus_message *msg = (void *)&((char *)impl->queue.buffer)[msg_idx * sizeof(struct bus_message)];
            aws_linked_list_push_back(&impl->queue.free, &msg->list_node);
        }
    }

    /* init subscription table */
    if (aws_hash_table_init(
            &impl->slots.table, bus->allocator, 8, aws_hash_ptr, aws_ptr_eq, NULL, s_bus_destroy_listener_list)) {
        AWS_LOGF_ERROR(
            AWS_LS_COMMON_BUS,
            "bus %p: Unable to initialize bus addressing table: %s",
            (void *)bus,
            aws_error_name(aws_last_error()));
        goto error;
    }

    /* Setup dispatch thread */
    if (aws_condition_variable_init(&impl->dispatch.notify)) {
        AWS_LOGF_ERROR(
            AWS_LS_COMMON_BUS,
            "bus %p: Unable to initialize async notify: %s",
            (void *)bus,
            aws_error_name(aws_last_error()));
        goto error;
    }

    if (aws_thread_init(&impl->dispatch.thread, bus->allocator)) {
        AWS_LOGF_ERROR(
            AWS_LS_COMMON_BUS,
            "bus %p: Unable to initialize background thread: %s",
            (void *)bus,
            aws_error_name(aws_last_error()));
        goto error;
    }

    impl->dispatch.running = true;
    aws_atomic_init_int(&impl->dispatch.started, 0);
    aws_atomic_init_int(&impl->dispatch.exited, 0);
    if (aws_thread_launch(&impl->dispatch.thread, s_bus_async_deliver, bus, aws_default_thread_options())) {
        AWS_LOGF_ERROR(
            AWS_LS_COMMON_BUS,
            "bus %p: Unable to launch delivery thread: %s",
            (void *)bus,
            aws_error_name(aws_last_error()));
        goto error;
    }

    /* wait for dispatch thread to start before returning control */
    AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Waiting for delivery thread to start", (void *)bus);
    while (!aws_atomic_load_int(&impl->dispatch.started)) {
        aws_thread_current_sleep(1000 * 1000);
    }
    AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Delivery thread started", (void *)bus);

    return;

error:
    aws_thread_clean_up(&impl->dispatch.thread);
    aws_condition_variable_clean_up(&impl->dispatch.notify);
    aws_hash_table_clean_up(&impl->slots.table);
    aws_mem_release(bus->allocator, &impl->queue.buffer);
    aws_mutex_clean_up(&impl->queue.mutex);
    aws_mem_release(bus->allocator, impl);
    bus->impl = NULL;
}