static void celix_earpmClient_messageCallback()

in bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.c [999:1070]


static void celix_earpmClient_messageCallback(struct mosquitto* mosq CELIX_UNUSED, void* handle,
        const struct mosquitto_message* message, const mosquitto_property* props) {
    assert(handle != NULL);
    assert(message != NULL);
    assert(message->topic != NULL);
    celix_earpm_client_t* client = handle;

    celix_logHelper_trace(client->logHelper, "Received message on topic %s.", message->topic);

    celix_earpm_client_request_info_t requestInfo;
    memset(&requestInfo, 0, sizeof(requestInfo));
    requestInfo.topic = message->topic;
    requestInfo.payload = message->payload;
    requestInfo.payloadSize = message->payloadlen;
    requestInfo.qos = message->qos;
    requestInfo.expiryInterval = -1;
    uint32_t expiryInterval;
    if (mosquitto_property_read_int32(props, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &expiryInterval, false) != NULL) {
        requestInfo.expiryInterval = expiryInterval;
    }
    celix_autofree char* responseTopic = NULL;
    celix_autofree void* correlationData = NULL;
    uint16_t correlationDataSize = 0;
    celix_autofree char* senderUUID = NULL;
    celix_autofree char* version = NULL;
    while (props) {
        int id = mosquitto_property_identifier(props);
        switch (id) {
            case MQTT_PROP_RESPONSE_TOPIC: {
                assert(responseTopic == NULL);//It is MQTT Protocol Error to include the Response Topic more than once
                if (mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &responseTopic, false) == NULL) {
                    celix_logHelper_error(client->logHelper, "Failed to get response topic from sync event %s.", message->topic);
                    return;
                }
                break;
            }
            case MQTT_PROP_CORRELATION_DATA: {
                assert(correlationData == NULL);//It is MQTT Protocol Error to include the Correlation Data more than once
                if (mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlationData, &correlationDataSize, false) == NULL) {
                    celix_logHelper_error(client->logHelper, "Failed to get correlation data from sync event %s.", message->topic);
                    return;
                }
                break;
            }
            case MQTT_PROP_USER_PROPERTY: {
                celix_autofree char* strName = NULL;
                celix_autofree char* strValue = NULL;
                if (mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &strName, &strValue, false) == NULL) {
                    celix_logHelper_error(client->logHelper, "Failed to get user property from sync event %s.", message->topic);
                    return;
                }
                if (celix_utils_stringEquals(strName, CELIX_EARPM_MQTT_USER_PROP_SENDER_UUID) && senderUUID == NULL) {
                    senderUUID = celix_steal_ptr(strValue);
                } else if (celix_utils_stringEquals(strName, CELIX_EARPM_MQTT_USER_PROP_MSG_VERSION) && version == NULL) {
                    version = celix_steal_ptr(strValue);
                }
                break;
            }
            default:
                break;//do nothing
        }
        props = mosquitto_property_next(props);
    }
    requestInfo.responseTopic = responseTopic;
    requestInfo.correlationData = correlationData;
    requestInfo.correlationDataSize = correlationDataSize;
    requestInfo.senderUUID = senderUUID;
    requestInfo.version = version;

    client->receiveMsgCallback(client->callbackHandle, &requestInfo);
    return;
}