in source/bus.c [586:674]
static void s_bus_async_init(struct aws_bus *bus, const struct aws_bus_options *options) {
struct bus_async_impl *impl = bus->impl = aws_mem_calloc(bus->allocator, 1, sizeof(struct bus_async_impl));
impl->vtable = bus_async_vtable;
impl->reliable = (options->policy == AWS_BUS_ASYNC_RELIABLE);
/* init msg queue */
if (aws_mutex_init(&impl->queue.mutex)) {
AWS_LOGF_ERROR(
AWS_LS_COMMON_BUS,
"bus %p: Unable to initialize queue synchronization: %s",
(void *)bus,
aws_error_name(aws_last_error()));
goto error;
}
aws_linked_list_init(&impl->queue.msgs);
aws_linked_list_init(&impl->queue.free);
aws_linked_list_init(&impl->queue.subs);
/* push as many bus_messages as we can into the free list from the buffer */
if (options->buffer_size) {
impl->queue.buffer = aws_mem_calloc(bus->allocator, 1, options->buffer_size);
impl->queue.buffer_end = ((uint8_t *)impl->queue.buffer) + options->buffer_size;
const int msg_count = (int)(options->buffer_size / sizeof(struct bus_message));
for (int msg_idx = 0; msg_idx < msg_count; ++msg_idx) {
struct bus_message *msg = (void *)&((char *)impl->queue.buffer)[msg_idx * sizeof(struct bus_message)];
aws_linked_list_push_back(&impl->queue.free, &msg->list_node);
}
}
/* init subscription table */
if (aws_hash_table_init(
&impl->slots.table, bus->allocator, 8, aws_hash_ptr, aws_ptr_eq, NULL, s_bus_destroy_listener_list)) {
AWS_LOGF_ERROR(
AWS_LS_COMMON_BUS,
"bus %p: Unable to initialize bus addressing table: %s",
(void *)bus,
aws_error_name(aws_last_error()));
goto error;
}
/* Setup dispatch thread */
if (aws_condition_variable_init(&impl->dispatch.notify)) {
AWS_LOGF_ERROR(
AWS_LS_COMMON_BUS,
"bus %p: Unable to initialize async notify: %s",
(void *)bus,
aws_error_name(aws_last_error()));
goto error;
}
if (aws_thread_init(&impl->dispatch.thread, bus->allocator)) {
AWS_LOGF_ERROR(
AWS_LS_COMMON_BUS,
"bus %p: Unable to initialize background thread: %s",
(void *)bus,
aws_error_name(aws_last_error()));
goto error;
}
impl->dispatch.running = true;
aws_atomic_init_int(&impl->dispatch.started, 0);
aws_atomic_init_int(&impl->dispatch.exited, 0);
if (aws_thread_launch(&impl->dispatch.thread, s_bus_async_deliver, bus, aws_default_thread_options())) {
AWS_LOGF_ERROR(
AWS_LS_COMMON_BUS,
"bus %p: Unable to launch delivery thread: %s",
(void *)bus,
aws_error_name(aws_last_error()));
goto error;
}
/* wait for dispatch thread to start before returning control */
AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Waiting for delivery thread to start", (void *)bus);
while (!aws_atomic_load_int(&impl->dispatch.started)) {
aws_thread_current_sleep(1000 * 1000);
}
AWS_LOGF_TRACE(AWS_LS_COMMON_BUS, "bus %p: Delivery thread started", (void *)bus);
return;
error:
aws_thread_clean_up(&impl->dispatch.thread);
aws_condition_variable_clean_up(&impl->dispatch.notify);
aws_hash_table_clean_up(&impl->slots.table);
aws_mem_release(bus->allocator, &impl->queue.buffer);
aws_mutex_clean_up(&impl->queue.mutex);
aws_mem_release(bus->allocator, impl);
bus->impl = NULL;
}