in bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_impl.c [116:239]
static celix_status_t celix_earpm_publishEvent(celix_event_admin_remote_provider_mqtt_t* earpm, const char *topic,
const celix_properties_t *eventProps, bool async);
celix_event_admin_remote_provider_mqtt_t* celix_earpm_create(celix_bundle_context_t* ctx) {
assert(ctx != NULL);
celix_autofree celix_event_admin_remote_provider_mqtt_t* earpm = calloc(1, sizeof(*earpm));
if (earpm == NULL) {
return NULL;
}
earpm->ctx = ctx;
earpm->lastAckSeqNr = 0;
earpm->destroying = false;
celix_autoptr(celix_log_helper_t) logHelper = earpm->logHelper = celix_logHelper_create(ctx, "celix_earpm");
if (logHelper == NULL) {
return NULL;
}
earpm->fwUUID = celix_bundleContext_getProperty(ctx, CELIX_FRAMEWORK_UUID, NULL);
if (earpm->fwUUID == NULL) {
celix_logHelper_error(logHelper, "Failed to get framework UUID.");
return NULL;
}
earpm->defaultQos = (celix_earpm_qos_e)celix_bundleContext_getPropertyAsLong(ctx, CELIX_EARPM_EVENT_DEFAULT_QOS, CELIX_EARPM_EVENT_DEFAULT_QOS_DEFAULT);
if (earpm->defaultQos <= CELIX_EARPM_QOS_UNKNOWN || earpm->defaultQos >= CELIX_EARPM_QOS_MAX) {
celix_logHelper_error(logHelper, "Invalid default QOS(%d) value.", (int)earpm->defaultQos);
return NULL;
}
earpm->continuousNoAckThreshold = (int)celix_bundleContext_getPropertyAsLong(ctx, CELIX_EARPM_SYNC_EVENT_CONTINUOUS_NO_ACK_THRESHOLD, CELIX_EARPM_SYNC_EVENT_CONTINUOUS_NO_ACK_THRESHOLD_DEFAULT);
if (earpm->continuousNoAckThreshold <= 0) {
celix_logHelper_error(logHelper, "Invalid continuous no ack threshold(%d) value.", earpm->continuousNoAckThreshold);
return NULL;
}
if (asprintf(&earpm->syncEventAckTopic, CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX"%s", earpm->fwUUID) < 0) {
celix_logHelper_error(logHelper, "Failed to create sync event response topic.");
return NULL;
}
celix_autofree char* syncEventAckTopic = earpm->syncEventAckTopic;
celix_status_t status = celixThreadMutex_create(&earpm->mutex, NULL);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(logHelper, "Failed to create mutex, %d.", status);
return NULL;
}
celix_autoptr(celix_thread_mutex_t) mutex = &earpm->mutex;
status = celixThreadCondition_init(&earpm->ackCond, NULL);
if (status != CELIX_SUCCESS) {
celix_logHelper_error(logHelper, "Failed to create condition for event ack, %d.", status);
return NULL;
}
celix_autoptr(celix_thread_cond_t) ackCond = &earpm->ackCond;
celix_autoptr(celix_long_hash_map_t) eventHandlers = NULL;
{
celix_long_hash_map_create_options_t opts = CELIX_EMPTY_LONG_HASH_MAP_CREATE_OPTIONS;
opts.simpleRemovedCallback = (void *)celix_earpm_eventHandlerDestroy;
eventHandlers = earpm->eventHandlers = celix_longHashMap_createWithOptions(&opts);
if (eventHandlers == NULL) {
celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR);
celix_logHelper_error(logHelper, "Failed to create local event handler map.");
return NULL;
}
}
celix_autoptr(celix_string_hash_map_t) eventSubscriptions = NULL;
{
celix_string_hash_map_create_options_t opts = CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS;
opts.simpleRemovedCallback = (void *)celix_earpm_subscriptionDestroy;
eventSubscriptions = earpm->eventSubscriptions = celix_stringHashMap_createWithOptions(&opts);
if (eventSubscriptions == NULL) {
celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR);
celix_logHelper_error(logHelper, "Failed to create event subscription map for local event handler.");
return NULL;
}
}
celix_autoptr(celix_string_hash_map_t) remoteFrameworks = NULL;
{
celix_string_hash_map_create_options_t opts = CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS;
opts.simpleRemovedCallback = (void*)celix_earpm_remoteFrameworkInfoDestroy;
remoteFrameworks = earpm->remoteFrameworks = celix_stringHashMap_createWithOptions(&opts);
if (remoteFrameworks == NULL) {
celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR);
celix_logHelper_error(logHelper, "Failed to create remote framework information map.");
return NULL;
}
}
celix_autoptr(celix_earpm_event_deliverer_t) deliverer = earpm->deliverer = celix_earpmDeliverer_create(ctx, logHelper);
if (deliverer == NULL) {
celix_logHelper_error(logHelper, "Failed to create event deliverer.");
return NULL;
}
celix_earpm_client_create_options_t opts;
memset(&opts, 0, sizeof(opts));
opts.ctx = ctx;
opts.logHelper = logHelper;
opts.sessionEndMsgTopic = CELIX_EARPM_SESSION_END_TOPIC;
opts.sessionEndMsgSenderUUID = earpm->fwUUID;
opts.sessionEndMsgVersion = CELIX_EARPM_MSG_VERSION;
opts.callbackHandle = earpm;
opts.receiveMsgCallback = celix_earpm_receiveMsgCallback;
opts.connectedCallback = celix_earpm_connectedCallback;
celix_autoptr(celix_earpm_client_t) mqttClient = earpm->mqttClient = celix_earpmClient_create(&opts);
if (mqttClient == NULL) {
celix_logHelper_error(logHelper, "Failed to create mqtt client.");
return NULL;
}
status = celix_earpmClient_subscribe(earpm->mqttClient, CELIX_EARPM_HANDLER_INFO_TOPIC_PREFIX"*", CELIX_EARPM_QOS_AT_LEAST_ONCE);
status = CELIX_DO_IF(status, celix_earpmClient_subscribe(earpm->mqttClient, CELIX_EARPM_SESSION_END_TOPIC, CELIX_EARPM_QOS_AT_LEAST_ONCE));
status = CELIX_DO_IF(status, celix_earpmClient_subscribe(earpm->mqttClient, syncEventAckTopic, CELIX_EARPM_QOS_AT_LEAST_ONCE));
if (status != CELIX_SUCCESS) {
celix_logHelper_error(earpm->logHelper, "Failed to subscribe control message. %d.", status);
return NULL;
}
celix_steal_ptr(mqttClient);
celix_steal_ptr(deliverer);
celix_steal_ptr(remoteFrameworks);
celix_steal_ptr(eventSubscriptions);
celix_steal_ptr(eventHandlers);
celix_steal_ptr(ackCond);
celix_steal_ptr(syncEventAckTopic);
celix_steal_ptr(mutex);
celix_steal_ptr(logHelper);
return celix_steal_ptr(earpm);
}