in source/mqtt_client_connection.c [1263:1354]
napi_value aws_napi_mqtt_client_connection_subscribe(napi_env env, napi_callback_info cb_info) {
napi_value node_args[5];
size_t num_args = AWS_ARRAY_SIZE(node_args);
napi_value *arg = &node_args[0];
AWS_NAPI_CALL(env, napi_get_cb_info(env, cb_info, &num_args, node_args, NULL, NULL), {
napi_throw_error(env, NULL, "Failed to retreive callback information");
return NULL;
});
if (num_args != AWS_ARRAY_SIZE(node_args)) {
napi_throw_error(env, NULL, "mqtt_client_connection_subscribe needs exactly 5 arguments");
return NULL;
}
napi_value node_binding = *arg++;
struct mqtt_connection_binding *binding = NULL;
AWS_NAPI_CALL(env, napi_get_value_external(env, node_binding, (void **)&binding), {
napi_throw_error(env, NULL, "Failed to extract binding from external");
return NULL;
});
struct aws_allocator *allocator = binding->allocator;
struct suback_args *suback = NULL;
struct subscription *sub = aws_mem_calloc(allocator, 1, sizeof(struct subscription));
AWS_FATAL_ASSERT(sub);
sub->allocator = allocator;
napi_value node_topic = *arg++;
AWS_NAPI_CALL(env, aws_byte_buf_init_from_napi(&sub->topic, env, node_topic), {
napi_throw_type_error(env, NULL, "topic must be a String");
goto cleanup;
});
napi_value node_qos = *arg++;
uint32_t qos_uint = 0;
AWS_NAPI_CALL(env, napi_get_value_uint32(env, node_qos, &qos_uint), {
napi_throw_type_error(env, NULL, "qos must be a number");
goto cleanup;
});
enum aws_mqtt_qos qos = (enum aws_mqtt_qos)qos_uint;
napi_value node_on_publish = *arg++;
if (!aws_napi_is_null_or_undefined(env, node_on_publish)) {
AWS_NAPI_CALL(
env,
aws_napi_create_threadsafe_function(
env,
node_on_publish,
"aws_mqtt_client_connection_on_publish",
s_on_publish_call,
binding,
&sub->on_publish),
{ goto cleanup; });
}
napi_value node_on_suback = *arg++;
if (!aws_napi_is_null_or_undefined(env, node_on_suback)) {
suback = aws_mem_calloc(allocator, 1, sizeof(struct suback_args));
AWS_FATAL_ASSERT(suback);
suback->allocator = allocator;
suback->binding = binding;
aws_byte_buf_init_copy_from_cursor(&suback->topic, allocator, aws_byte_cursor_from_buf(&sub->topic));
AWS_NAPI_CALL(
env,
aws_napi_create_threadsafe_function(
env,
node_on_suback,
"aws_mqtt_client_connection_on_suback",
s_on_suback_call,
binding,
&suback->on_suback),
{ goto cleanup; });
}
struct aws_byte_cursor topic_cur = aws_byte_cursor_from_buf(&sub->topic);
uint16_t sub_id = aws_mqtt_client_connection_subscribe(
binding->connection, &topic_cur, qos, s_on_publish, sub, s_on_publish_user_data_clean_up, s_on_suback, suback);
if (!sub_id) {
aws_napi_throw_last_error(env);
goto cleanup;
}
return NULL;
cleanup:
s_destroy_subscription(sub);
s_destroy_suback_args(suback);
return NULL;
}