static AMQP_MESSENGER_DISPOSITION_RESULT on_amqp_message_received_callback()

in iothub_client/src/iothubtransport_amqp_twin_messenger.c [1341:1563]


static AMQP_MESSENGER_DISPOSITION_RESULT on_amqp_message_received_callback(MESSAGE_HANDLE message, AMQP_MESSENGER_MESSAGE_DISPOSITION_INFO* disposition_info, void* context)
{

    AMQP_MESSENGER_DISPOSITION_RESULT disposition_result;

    if (message == NULL || context == NULL)
    {
        LogError("Invalid argument (message=%p, context=%p)", message, context);
        disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;
    }
    else
    {
        TWIN_MESSENGER_INSTANCE* twin_msgr = (TWIN_MESSENGER_INSTANCE*)context;

        char* correlation_id;

        bool has_status_code;
        int status_code;

        bool has_version;
        int64_t version;

        bool has_twin_report;
        BINARY_DATA twin_report;

        amqp_messenger_destroy_disposition_info(disposition_info);
        disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_ACCEPTED;

        if (parse_incoming_twin_message(message, &correlation_id, &has_version, &version, &has_status_code, &status_code, &has_twin_report, &twin_report) != 0)
        {
            LogError("Failed parsing incoming TWIN message (%s)", twin_msgr->device_id);
        }
        else
        {
            if (correlation_id != NULL)
            {
                // It is supposed to be a request sent previously (reported properties PATCH, GET, PUT or DELETE).

                LIST_ITEM_HANDLE list_item;
                if ((list_item = singlylinkedlist_find(twin_msgr->operations, find_twin_operation_by_correlation_id, (const void*)correlation_id)) == NULL)
                {
                    LogError("Could not find context of TWIN incoming message (%s, %s)", twin_msgr->device_id, correlation_id);
                }
                else
                {
                    TWIN_OPERATION_CONTEXT* twin_op_ctx;

                    if ((twin_op_ctx = (TWIN_OPERATION_CONTEXT*)singlylinkedlist_item_get_value(list_item)) == NULL)
                    {
                        LogError("Could not get context for incoming TWIN message (%s, %s)", twin_msgr->device_id, correlation_id);
                    }
                    else
                    {
                        if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PATCH)
                        {
                            if (!has_status_code)
                            {
                                LogError("Received an incoming TWIN message for a PATCH operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id);

                                disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;

                                if (twin_op_ctx->cb.reported_properties.callback != NULL)
                                {
                                    twin_op_ctx->cb.reported_properties.callback(TWIN_REPORT_STATE_RESULT_ERROR, TWIN_REPORT_STATE_REASON_INVALID_RESPONSE, 0, twin_op_ctx->cb.reported_properties.context);
                                }
                            }
                            else
                            {
                                if (twin_op_ctx->cb.reported_properties.callback != NULL)
                                {
                                    twin_op_ctx->cb.reported_properties.callback(TWIN_REPORT_STATE_RESULT_SUCCESS, TWIN_REPORT_STATE_REASON_NONE, status_code, twin_op_ctx->cb.reported_properties.context);
                                }
                            }
                        }
                        else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET)
                        {
                            if (!has_twin_report)
                            {
                                LogError("Received an incoming TWIN message for a GET operation, but with no report (%s, %s)", twin_msgr->device_id, correlation_id);

                                disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;

                                if (twin_op_ctx->msgr->on_message_received_callback != NULL)
                                {
                                    twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, NULL, 0, twin_op_ctx->msgr->on_message_received_context);
                                }

                                if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES)
                                {
                                    twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_GET_COMPLETE_PROPERTIES;
                                    twin_msgr->subscription_error_count++;
                                }
                            }
                            else
                            {
                                if (twin_op_ctx->msgr->on_message_received_callback != NULL)
                                {
                                    twin_op_ctx->msgr->on_message_received_callback(TWIN_UPDATE_TYPE_COMPLETE, (const char*)twin_report.bytes, twin_report.length, twin_op_ctx->msgr->on_message_received_context);
                                }

                                if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_GETTING_COMPLETE_PROPERTIES)
                                {
                                    twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES;
                                    twin_msgr->subscription_error_count = 0;
                                }
                            }
                        }
                        else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_GET_ON_DEMAND)
                        {
                            if (!has_twin_report)
                            {
                                LogError("Received an incoming TWIN message for a GET operation, but with no report (%s, %s)", twin_msgr->device_id, correlation_id);

                                disposition_result = AMQP_MESSENGER_DISPOSITION_RESULT_REJECTED;

                                twin_op_ctx->cb.get_twin.callback(TWIN_UPDATE_TYPE_COMPLETE, NULL, 0, twin_op_ctx->cb.get_twin.context);
                            }
                            else
                            {
                                twin_op_ctx->cb.get_twin.callback(TWIN_UPDATE_TYPE_COMPLETE, (const char*)twin_report.bytes, twin_report.length, twin_op_ctx->cb.get_twin.context);
                            }
                        }
                        else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_PUT)
                        {
                            if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBED)
                            {
                                bool subscription_succeeded = true;

                                if (!has_status_code)
                                {
                                    LogError("Received an incoming TWIN message for a PUT operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id);

                                    subscription_succeeded = false;
                                }
                                else if (status_code < 200 || status_code >= 300)
                                {
                                    LogError("Received status code %d for TWIN subscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id);

                                    subscription_succeeded = false;
                                }

                                if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_SUBSCRIBING)
                                {
                                    if (subscription_succeeded)
                                    {
                                        twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBED;
                                        twin_msgr->subscription_error_count = 0;
                                    }
                                    else
                                    {
                                        twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_SUBSCRIBE_FOR_UPDATES;
                                        twin_msgr->subscription_error_count++;
                                    }
                                }
                            }
                        }
                        else if (twin_op_ctx->type == TWIN_OPERATION_TYPE_DELETE)
                        {
                            if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED)
                            {
                                bool unsubscription_succeeded = true;

                                if (!has_status_code)
                                {
                                    LogError("Received an incoming TWIN message for a DELETE operation, but with no status code (%s, %s)", twin_msgr->device_id, correlation_id);

                                    unsubscription_succeeded = false;
                                }
                                else if (status_code < 200 || status_code >= 300)
                                {
                                    LogError("Received status code %d for TWIN unsubscription request (%s, %s)", status_code, twin_msgr->device_id, correlation_id);

                                    unsubscription_succeeded = false;
                                }

                                if (twin_msgr->subscription_state == TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBING)
                                {
                                    if (unsubscription_succeeded)
                                    {
                                        twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_NOT_SUBSCRIBED;
                                        twin_msgr->subscription_error_count = 0;
                                    }
                                    else
                                    {
                                        twin_msgr->subscription_state = TWIN_SUBSCRIPTION_STATE_UNSUBSCRIBE;
                                        twin_msgr->subscription_error_count++;
                                    }
                                }
                            }
                        }

                        destroy_twin_operation_context(twin_op_ctx);
                    }

                    if (singlylinkedlist_remove(twin_msgr->operations, list_item) != 0)
                    {
                        LogError("Failed removing context for incoming TWIN message (%s, %s)",
                            twin_msgr->device_id, correlation_id);

                        update_state(twin_msgr, TWIN_MESSENGER_STATE_ERROR);
                    }
                }

                free(correlation_id);
            }
            else if (has_twin_report)
            {
                // It is supposed to be a desired properties delta update.

                if (twin_msgr->on_message_received_callback != NULL)
                {
                    twin_msgr->on_message_received_callback(TWIN_UPDATE_TYPE_PARTIAL, (const char*)twin_report.bytes, twin_report.length, twin_msgr->on_message_received_context);
                }
            }
            else
            {
                LogError("Received TWIN message with no correlation-id and no report (%s)", twin_msgr->device_id);
            }
        }
    }

    return disposition_result;
}