in iothub_client/src/iothubtransportamqp_methods.c [249:441]
static AMQP_VALUE on_message_received(const void* context, MESSAGE_HANDLE message)
{
PROPERTIES_HANDLE properties;
/* VS believes this is not initialized, so have to set it to the worse case here */
AMQP_VALUE result = NULL;
IOTHUBTRANSPORT_AMQP_METHODS_HANDLE amqp_methods_handle = (IOTHUBTRANSPORT_AMQP_METHODS_HANDLE)context;
MESSAGE_OUTCOME message_outcome;
if (message == NULL)
{
LogError("NULL message");
message_outcome = MESSAGE_OUTCOME_RELEASED;
}
else
{
if (message_get_properties(message, &properties) != 0)
{
LogError("Cannot retrieve message properties");
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot retrieve message properties");
}
else
{
AMQP_VALUE correlation_id;
if (properties_get_correlation_id(properties, &correlation_id) != 0)
{
LogError("Cannot retrieve correlation id");
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot retrieve correlation id");
}
else
{
IOTHUBTRANSPORT_AMQP_METHOD* method_handle = (IOTHUBTRANSPORT_AMQP_METHOD*)malloc(sizeof(IOTHUBTRANSPORT_AMQP_METHOD));
if (method_handle == NULL)
{
LogError("Cannot allocate method handle");
message_outcome = MESSAGE_OUTCOME_RELEASED;
}
else
{
IOTHUBTRANSPORT_AMQP_METHOD_HANDLE* new_handles;
size_t realloc_size = safe_multiply_size_t(safe_add_size_t(amqp_methods_handle->method_request_handle_count, 1), sizeof(IOTHUBTRANSPORT_AMQP_METHOD_HANDLE));
if (realloc_size == SIZE_MAX ||
(new_handles = (IOTHUBTRANSPORT_AMQP_METHOD_HANDLE*)realloc(amqp_methods_handle->method_request_handles, realloc_size)) == NULL)
{
free(method_handle);
LogError("Cannot grow method handles array, size:%zu", realloc_size);
message_outcome = MESSAGE_OUTCOME_RELEASED;
}
else
{
amqp_methods_handle->method_request_handles = new_handles;
if (amqpvalue_get_uuid(correlation_id, &method_handle->correlation_id) != 0)
{
free(method_handle);
LogError("Cannot get uuid value for correlation-id");
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot get uuid value for correlation-id");
}
else
{
BINARY_DATA binary_data;
if (message_get_body_amqp_data_in_place(message, 0, &binary_data) != 0)
{
free(method_handle);
LogError("Cannot get method request message payload");
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot get method request message payload");
}
else
{
AMQP_VALUE application_properties;
if (message_get_application_properties(message, &application_properties) != 0)
{
LogError("Cannot get application properties");
free(method_handle);
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot get application properties");
}
else
{
AMQP_VALUE amqp_properties_map = amqpvalue_get_inplace_described_value(application_properties);
if (amqp_properties_map == NULL)
{
LogError("Cannot get application properties map");
free(method_handle);
message_outcome = MESSAGE_OUTCOME_RELEASED;
}
else
{
AMQP_VALUE property_key = amqpvalue_create_string("IoThub-methodname");
if (property_key == NULL)
{
LogError("Cannot create the property key for method name");
free(method_handle);
message_outcome = MESSAGE_OUTCOME_RELEASED;
}
else
{
AMQP_VALUE property_value = amqpvalue_get_map_value(amqp_properties_map, property_key);
if (property_value == NULL)
{
LogError("Cannot find the IoThub-methodname property in the properties map");
free(method_handle);
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot find the IoThub-methodname property in the properties map");
}
else
{
const char* method_name;
if (amqpvalue_get_string(property_value, &method_name) != 0)
{
LogError("Cannot read the method name from the property value");
free(method_handle);
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:decode-error", "Cannot read the method name from the property value");
}
else
{
result = messaging_delivery_accepted();
if (result == NULL)
{
LogError("Cannot allocate memory for delivery state");
free(method_handle);
message_outcome = MESSAGE_OUTCOME_RELEASED;
}
else
{
method_handle->iothubtransport_amqp_methods_handle = amqp_methods_handle;
/* set the method request handle in the handle array */
amqp_methods_handle->method_request_handles[amqp_methods_handle->method_request_handle_count] = method_handle;
amqp_methods_handle->method_request_handle_count++;
if (amqp_methods_handle->on_method_request_received(amqp_methods_handle->on_method_request_received_context, method_name, binary_data.bytes, binary_data.length, method_handle) != 0)
{
LogError("Cannot execute the callback with the given data");
amqpvalue_destroy(result);
free(method_handle);
amqp_methods_handle->method_request_handle_count--;
message_outcome = MESSAGE_OUTCOME_REJECTED;
result = messaging_delivery_rejected("amqp:internal-error", "Cannot execute the callback with the given data");
}
else
{
message_outcome = MESSAGE_OUTCOME_ACCEPTED;
}
}
}
amqpvalue_destroy(property_value);
}
amqpvalue_destroy(property_key);
}
}
application_properties_destroy(application_properties);
}
}
}
}
}
}
properties_destroy(properties);
}
}
switch (message_outcome)
{
default:
break;
case MESSAGE_OUTCOME_RELEASED:
result = messaging_delivery_released();
amqp_methods_handle->on_methods_error(amqp_methods_handle->on_methods_error_context);
break;
case MESSAGE_OUTCOME_REJECTED:
case MESSAGE_OUTCOME_ACCEPTED:
/* all is well */
break;
}
return result;
}