static celix_status_t celix_earpm_publishEvent()

in bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_impl.c [116:239]


static celix_status_t celix_earpm_publishEvent(celix_event_admin_remote_provider_mqtt_t* earpm, const char *topic,
                                               const celix_properties_t *eventProps, bool async);


celix_event_admin_remote_provider_mqtt_t* celix_earpm_create(celix_bundle_context_t* ctx) {
    assert(ctx != NULL);
    celix_autofree celix_event_admin_remote_provider_mqtt_t* earpm = calloc(1, sizeof(*earpm));
    if (earpm == NULL) {
        return NULL;
    }
    earpm->ctx = ctx;
    earpm->lastAckSeqNr = 0;
    earpm->destroying = false;
    celix_autoptr(celix_log_helper_t) logHelper = earpm->logHelper = celix_logHelper_create(ctx, "celix_earpm");
    if (logHelper == NULL) {
        return NULL;
    }
    earpm->fwUUID = celix_bundleContext_getProperty(ctx, CELIX_FRAMEWORK_UUID, NULL);
    if (earpm->fwUUID == NULL) {
        celix_logHelper_error(logHelper, "Failed to get framework UUID.");
        return NULL;
    }
    earpm->defaultQos = (celix_earpm_qos_e)celix_bundleContext_getPropertyAsLong(ctx, CELIX_EARPM_EVENT_DEFAULT_QOS, CELIX_EARPM_EVENT_DEFAULT_QOS_DEFAULT);
    if (earpm->defaultQos <= CELIX_EARPM_QOS_UNKNOWN || earpm->defaultQos >= CELIX_EARPM_QOS_MAX) {
        celix_logHelper_error(logHelper, "Invalid default QOS(%d) value.", (int)earpm->defaultQos);
        return NULL;
    }

    earpm->continuousNoAckThreshold = (int)celix_bundleContext_getPropertyAsLong(ctx, CELIX_EARPM_SYNC_EVENT_CONTINUOUS_NO_ACK_THRESHOLD, CELIX_EARPM_SYNC_EVENT_CONTINUOUS_NO_ACK_THRESHOLD_DEFAULT);
    if (earpm->continuousNoAckThreshold <= 0) {
        celix_logHelper_error(logHelper, "Invalid continuous no ack threshold(%d) value.", earpm->continuousNoAckThreshold);
        return NULL;
    }

    if (asprintf(&earpm->syncEventAckTopic, CELIX_EARPM_SYNC_EVENT_ACK_TOPIC_PREFIX"%s", earpm->fwUUID) < 0) {
        celix_logHelper_error(logHelper, "Failed to create sync event response topic.");
        return NULL;
    }
    celix_autofree char* syncEventAckTopic = earpm->syncEventAckTopic;
    celix_status_t status = celixThreadMutex_create(&earpm->mutex, NULL);
    if (status != CELIX_SUCCESS) {
        celix_logHelper_error(logHelper, "Failed to create mutex, %d.", status);
        return NULL;
    }
    celix_autoptr(celix_thread_mutex_t) mutex = &earpm->mutex;
    status = celixThreadCondition_init(&earpm->ackCond, NULL);
    if (status != CELIX_SUCCESS) {
        celix_logHelper_error(logHelper, "Failed to create condition for event ack, %d.", status);
        return NULL;
    }
    celix_autoptr(celix_thread_cond_t) ackCond = &earpm->ackCond;
    celix_autoptr(celix_long_hash_map_t) eventHandlers = NULL;
    {
        celix_long_hash_map_create_options_t opts = CELIX_EMPTY_LONG_HASH_MAP_CREATE_OPTIONS;
        opts.simpleRemovedCallback = (void *)celix_earpm_eventHandlerDestroy;
        eventHandlers = earpm->eventHandlers = celix_longHashMap_createWithOptions(&opts);
        if (eventHandlers == NULL) {
            celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR);
            celix_logHelper_error(logHelper, "Failed to create local event handler map.");
            return NULL;
        }
    }
    celix_autoptr(celix_string_hash_map_t) eventSubscriptions = NULL;
    {
        celix_string_hash_map_create_options_t opts = CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS;
        opts.simpleRemovedCallback = (void *)celix_earpm_subscriptionDestroy;
        eventSubscriptions = earpm->eventSubscriptions = celix_stringHashMap_createWithOptions(&opts);
        if (eventSubscriptions == NULL) {
            celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR);
            celix_logHelper_error(logHelper, "Failed to create event subscription map for local event handler.");
            return NULL;
        }
    }
    celix_autoptr(celix_string_hash_map_t) remoteFrameworks = NULL;
    {
        celix_string_hash_map_create_options_t opts = CELIX_EMPTY_STRING_HASH_MAP_CREATE_OPTIONS;
        opts.simpleRemovedCallback = (void*)celix_earpm_remoteFrameworkInfoDestroy;
        remoteFrameworks = earpm->remoteFrameworks = celix_stringHashMap_createWithOptions(&opts);
        if (remoteFrameworks == NULL) {
            celix_logHelper_logTssErrors(logHelper, CELIX_LOG_LEVEL_ERROR);
            celix_logHelper_error(logHelper, "Failed to create remote framework information map.");
            return NULL;
        }
    }
    celix_autoptr(celix_earpm_event_deliverer_t) deliverer = earpm->deliverer = celix_earpmDeliverer_create(ctx, logHelper);
    if (deliverer == NULL) {
        celix_logHelper_error(logHelper, "Failed to create event deliverer.");
        return NULL;
    }
    celix_earpm_client_create_options_t opts;
    memset(&opts, 0, sizeof(opts));
    opts.ctx = ctx;
    opts.logHelper = logHelper;
    opts.sessionEndMsgTopic = CELIX_EARPM_SESSION_END_TOPIC;
    opts.sessionEndMsgSenderUUID = earpm->fwUUID;
    opts.sessionEndMsgVersion = CELIX_EARPM_MSG_VERSION;
    opts.callbackHandle = earpm;
    opts.receiveMsgCallback = celix_earpm_receiveMsgCallback;
    opts.connectedCallback = celix_earpm_connectedCallback;
    celix_autoptr(celix_earpm_client_t) mqttClient = earpm->mqttClient = celix_earpmClient_create(&opts);
    if (mqttClient == NULL) {
        celix_logHelper_error(logHelper, "Failed to create mqtt client.");
        return NULL;
    }
    status = celix_earpmClient_subscribe(earpm->mqttClient, CELIX_EARPM_HANDLER_INFO_TOPIC_PREFIX"*", CELIX_EARPM_QOS_AT_LEAST_ONCE);
    status = CELIX_DO_IF(status, celix_earpmClient_subscribe(earpm->mqttClient, CELIX_EARPM_SESSION_END_TOPIC, CELIX_EARPM_QOS_AT_LEAST_ONCE));
    status = CELIX_DO_IF(status, celix_earpmClient_subscribe(earpm->mqttClient, syncEventAckTopic, CELIX_EARPM_QOS_AT_LEAST_ONCE));
    if (status != CELIX_SUCCESS) {
        celix_logHelper_error(earpm->logHelper, "Failed to subscribe control message. %d.", status);
        return NULL;
    }

    celix_steal_ptr(mqttClient);
    celix_steal_ptr(deliverer);
    celix_steal_ptr(remoteFrameworks);
    celix_steal_ptr(eventSubscriptions);
    celix_steal_ptr(eventHandlers);
    celix_steal_ptr(ackCond);
    celix_steal_ptr(syncEventAckTopic);
    celix_steal_ptr(mutex);
    celix_steal_ptr(logHelper);

    return celix_steal_ptr(earpm);
}