in bundles/event_admin/remote_provider/remote_provider_mqtt/src/celix_earpm_client.c [999:1070]
static void celix_earpmClient_messageCallback(struct mosquitto* mosq CELIX_UNUSED, void* handle,
const struct mosquitto_message* message, const mosquitto_property* props) {
assert(handle != NULL);
assert(message != NULL);
assert(message->topic != NULL);
celix_earpm_client_t* client = handle;
celix_logHelper_trace(client->logHelper, "Received message on topic %s.", message->topic);
celix_earpm_client_request_info_t requestInfo;
memset(&requestInfo, 0, sizeof(requestInfo));
requestInfo.topic = message->topic;
requestInfo.payload = message->payload;
requestInfo.payloadSize = message->payloadlen;
requestInfo.qos = message->qos;
requestInfo.expiryInterval = -1;
uint32_t expiryInterval;
if (mosquitto_property_read_int32(props, MQTT_PROP_MESSAGE_EXPIRY_INTERVAL, &expiryInterval, false) != NULL) {
requestInfo.expiryInterval = expiryInterval;
}
celix_autofree char* responseTopic = NULL;
celix_autofree void* correlationData = NULL;
uint16_t correlationDataSize = 0;
celix_autofree char* senderUUID = NULL;
celix_autofree char* version = NULL;
while (props) {
int id = mosquitto_property_identifier(props);
switch (id) {
case MQTT_PROP_RESPONSE_TOPIC: {
assert(responseTopic == NULL);//It is MQTT Protocol Error to include the Response Topic more than once
if (mosquitto_property_read_string(props, MQTT_PROP_RESPONSE_TOPIC, &responseTopic, false) == NULL) {
celix_logHelper_error(client->logHelper, "Failed to get response topic from sync event %s.", message->topic);
return;
}
break;
}
case MQTT_PROP_CORRELATION_DATA: {
assert(correlationData == NULL);//It is MQTT Protocol Error to include the Correlation Data more than once
if (mosquitto_property_read_binary(props, MQTT_PROP_CORRELATION_DATA, &correlationData, &correlationDataSize, false) == NULL) {
celix_logHelper_error(client->logHelper, "Failed to get correlation data from sync event %s.", message->topic);
return;
}
break;
}
case MQTT_PROP_USER_PROPERTY: {
celix_autofree char* strName = NULL;
celix_autofree char* strValue = NULL;
if (mosquitto_property_read_string_pair(props, MQTT_PROP_USER_PROPERTY, &strName, &strValue, false) == NULL) {
celix_logHelper_error(client->logHelper, "Failed to get user property from sync event %s.", message->topic);
return;
}
if (celix_utils_stringEquals(strName, CELIX_EARPM_MQTT_USER_PROP_SENDER_UUID) && senderUUID == NULL) {
senderUUID = celix_steal_ptr(strValue);
} else if (celix_utils_stringEquals(strName, CELIX_EARPM_MQTT_USER_PROP_MSG_VERSION) && version == NULL) {
version = celix_steal_ptr(strValue);
}
break;
}
default:
break;//do nothing
}
props = mosquitto_property_next(props);
}
requestInfo.responseTopic = responseTopic;
requestInfo.correlationData = correlationData;
requestInfo.correlationDataSize = correlationDataSize;
requestInfo.senderUUID = senderUUID;
requestInfo.version = version;
client->receiveMsgCallback(client->callbackHandle, &requestInfo);
return;
}