in iothub_client/src/iothubtransport_amqp_twin_messenger.c [1341:1563]
static AMQP_MESSENGER_DISPOSITION_RESULT on_amqp_message_received_callback(MESSAGE_HANDLE message, AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, void* context)
{
AMQP_MESSENGER_DISPOSITION_RESULT disposition_result;
if (message == NULL || context == NULL)
{
LogError("Invalid argument (message=%p, context=%p)", message, context);
disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;
}
else
{
TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context;
char* correlation_id;
bool has_status_code;
int status_code;
bool has_version;
int64_t version;
bool has_twin_report;
BINARY_DATA twin_report;
amqp_messenger_destroy_disposition_info(disposition_info);
disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED;
if (parse_incoming_twin_message(message, &correlation_id, &has_version, &version, &has_status_code, &status_code, &has_twin_report, &twin_report) != 0)
{
LogError("Failed parsing incoming TWIN message (%s)", twin_msgr->device_id);
}
else
{
if (correlation_id != NULL)
{
// It is supposed to be a request sent previously (reported properties PATCH, GET, PUT or DELETE).
LIST_ITEM_HANDLE list_item;
if ((list_item = singlylinkedlist_find(twin_msgr->operations, find_twin_operation_by_correlation_id, (const void*)correlation_id)) == NULL)
{
LogError("Could not find context of TWIN incoming message (%s, %s)", twin_msgr->device_id, correlation_id);
}
else
{
TWIN_OPERATION_CONTEXT* twin_op_ctx;
if ((twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item)) == NULL)
{
LogError("Could not get context for incoming TWIN message (%s, %s)", twin_msgr->device_id, correlation_id);
}
else
{
if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH)
{
if (!has_status_code)
{
LogError("Received an incoming TWIN message for a PATCH operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id);
disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;
if (twin_op_ctx->cb.reported_properties.callback != NULL)
{
twin_op_ctx->cb.reported_properties.callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INVALID_RESPONSE, 0, twin_op_ctx->cb.reported_properties.context);
}
}
else
{
if (twin_op_ctx->cb.reported_properties.callback != NULL)
{
twin_op_ctx->cb.reported_properties.callback(TWIN_REPORT_STATE_RESULT_SUCCESS, TWIN_REPORT_STATE_REASON_NONE, status_code, twin_op_ctx->cb.reported_properties.context);
}
}
}
else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET)
{
if (!has_twin_report)
{
LogError("Received an incoming TWIN message for a GET operation, but with no report (%s, %s)", twin_msgr->device_id, correlation_id);
disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;
if (twin_op_ctx->msgr->on_message_received_callback != NULL)
{
twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, NULL, 0, twin_op_ctx->msgr->on_message_received_context);
}
if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES)
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES;
twin_msgr->subscription_error_count++;
}
}
else
{
if (twin_op_ctx->msgr->on_message_received_callback != NULL)
{
twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, (const char*)twin_report.bytes, twin_report.length, twin_op_ctx->msgr->on_message_received_context);
}
if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES)
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES;
twin_msgr->subscription_error_count = 0;
}
}
}
else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET_ON_DEMAND)
{
if (!has_twin_report)
{
LogError("Received an incoming TWIN message for a GET operation, but with no report (%s, %s)", twin_msgr->device_id, correlation_id);
disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;
twin_op_ctx->cb.get_twin.callback(TWIN_UPDATE_TYPE_COMPLETE, NULL, 0, twin_op_ctx->cb.get_twin.context);
}
else
{
twin_op_ctx->cb.get_twin.callback(TWIN_UPDATE_TYPE_COMPLETE, (const char*)twin_report.bytes, twin_report.length, twin_op_ctx->cb.get_twin.context);
}
}
else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT)
{
if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBED)
{
bool subscription_succeeded = true;
if (!has_status_code)
{
LogError("Received an incoming TWIN message for a PUT operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id);
subscription_succeeded = false;
}
else if (status_code < 200 || status_code >= 300)
{
LogError("Received status code %d for TWIN subscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id);
subscription_succeeded = false;
}
if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBING)
{
if (subscription_succeeded)
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBED;
twin_msgr->subscription_error_count = 0;
}
else
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES;
twin_msgr->subscription_error_count++;
}
}
}
}
else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_DELETE)
{
if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED)
{
bool unsubscription_succeeded = true;
if (!has_status_code)
{
LogError("Received an incoming TWIN message for a DELETE operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id);
unsubscription_succeeded = false;
}
else if (status_code < 200 || status_code >= 300)
{
LogError("Received status code %d for TWIN unsubscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id);
unsubscription_succeeded = false;
}
if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING)
{
if (unsubscription_succeeded)
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED;
twin_msgr->subscription_error_count = 0;
}
else
{
twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE;
twin_msgr->subscription_error_count++;
}
}
}
}
destroy_twin_operation_context(twin_op_ctx);
}
if (singlylinkedlist_remove(twin_msgr->operations, list_item) != 0)
{
LogError("Failed removing context for incoming TWIN message (%s, %s)",
twin_msgr->device_id, correlation_id);
update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR);
}
}
free(correlation_id);
}
else if (has_twin_report)
{
// It is supposed to be a desired properties delta update.
if (twin_msgr->on_message_received_callback != NULL)
{
twin_msgr->on_message_received_callback(TWIN_UPDATE_TYPE_PARTIAL, (const char*)twin_report.bytes, twin_report.length, twin_msgr->on_message_received_context);
}
}
else
{
LogError("Received TWIN message with no correlation-id and no report (%s)", twin_msgr->device_id);
}
}
}
return disposition_result;
}