IOTHUB_TEST_CLIENT_RESULT IoTHubTest_SendMessage()

in testtools/iothub_test/src/iothubtest.c [1746:2014]


IOTHUB_TEST_CLIENT_RESULT IoTHubTest_SendMessage(IOTHUB_TEST_HANDLE devhubHandle, const unsigned char* data, size_t len)
{
    IOTHUB_TEST_CLIENT_RESULT result;

    if ((devhubHandle == NULL) ||
        ((len == 0) && (data != NULL)) ||
        ((data != NULL) && (len == 0)))
    {
        LogError("Invalid arguments for IoTHubTest_SendMessage, devhubHandle = %p, len = %zu, data = %p.", devhubHandle, len, data);
        result = IOTHUB_TEST_CLIENT_ERROR;
    }
    else
    {
        IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)devhubHandle;
        char* authcid = CreateSendAuthCid(devhubValInfo);
        if (authcid == NULL)
        {
            LogError("Could not create authcid for SASL plain.");
            result = IOTHUB_TEST_CLIENT_ERROR;
        }
        else
        {
            XIO_HANDLE sasl_io = NULL;
            CONNECTION_HANDLE connection = NULL;
            SESSION_HANDLE session = NULL;
            LINK_HANDLE link = NULL;
            MESSAGE_SENDER_HANDLE message_sender = NULL;
            SASL_MECHANISM_HANDLE sasl_mechanism_handle = NULL;
            XIO_HANDLE tls_io = NULL;

            char* target_address = CreateSendTargetAddress(devhubValInfo);
            if (target_address == NULL)
            {
                LogError("Could not create target_address string.");
                result = IOTHUB_TEST_CLIENT_ERROR;
            }
            else
            {
                size_t deviceDestLen = strlen(AMQP_ADDRESS_PATH_FMT) + strlen(devhubValInfo->deviceId) + 1;
                char* deviceDest = (char*)malloc(deviceDestLen + 1);
                if (deviceDest == NULL)
                {
                    LogError("Could not create device destination string.");
                    result = IOTHUB_TEST_CLIENT_ERROR;
                }
                else
                {
                    SASL_PLAIN_CONFIG sasl_plain_config;
                    sasl_plain_config.authcid = authcid;
                    sasl_plain_config.passwd = devhubValInfo->iotSharedSig;
                    sasl_plain_config.authzid = NULL;
                    const SASL_MECHANISM_INTERFACE_DESCRIPTION* sasl_mechanism_interface_description;

                    (void)sprintf_s(deviceDest, deviceDestLen + 1, AMQP_ADDRESS_PATH_FMT, devhubValInfo->deviceId);

                    if ((sasl_mechanism_interface_description = saslplain_get_interface()) == NULL)
                    {
                        LogError("Could not get SASL plain mechanism interface.");
                        result = IOTHUB_TEST_CLIENT_ERROR;
                    }
                    else if ((sasl_mechanism_handle = saslmechanism_create(sasl_mechanism_interface_description, &sasl_plain_config)) == NULL)
                    {
                        LogError("Could not create SASL plain mechanism.");
                        result = IOTHUB_TEST_CLIENT_ERROR;
                    }
                    else
                    {
                        /* create the TLS IO */
                        TLSIO_CONFIG tls_io_config;
                        tls_io_config.hostname = devhubValInfo->hostName;
                        tls_io_config.port = 5671;
                        tls_io_config.underlying_io_interface = NULL;
                        tls_io_config.underlying_io_parameters = NULL;
                        const IO_INTERFACE_DESCRIPTION* tlsio_interface;

                        if ((tlsio_interface = platform_get_default_tlsio()) == NULL)
                        {
                            LogError("Could not get default TLS IO interface.");
                            result = IOTHUB_TEST_CLIENT_ERROR;
                        }
                        else if ((tls_io = xio_create(tlsio_interface, &tls_io_config)) == NULL)
                        {
                            LogError("Could not create TLS IO.");
                            result = IOTHUB_TEST_CLIENT_ERROR;
                        }
                        else
                        {
                            /* create the SASL client IO using the TLS IO */
                            SASLCLIENTIO_CONFIG sasl_io_config;
                            sasl_io_config.underlying_io = tls_io;
                            sasl_io_config.sasl_mechanism = sasl_mechanism_handle;
                            const IO_INTERFACE_DESCRIPTION* saslclientio_interface;

                            if ((saslclientio_interface = saslclientio_get_interface_description()) == NULL)
                            {
                                LogError("Could not create get SASL IO interface description.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            else if ((sasl_io = xio_create(saslclientio_interface, &sasl_io_config)) == NULL)
                            {
                                LogError("Could not create SASL IO.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            /* create the connection, session and link */
                            else if ((connection = connection_create(sasl_io, devhubValInfo->hostName, "some", NULL, NULL)) == NULL)
                            {
                                LogError("Could not create connection.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            else if ((session = session_create(connection, NULL, NULL)) == NULL)
                            {
                                LogError("Could not create session.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            else if (session_set_incoming_window(session, 2147483647) != 0)
                            {
                                LogError("Could not set incoming window.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            else if (session_set_outgoing_window(session, 65536) != 0)
                            {
                                LogError("Could not set outgoing window.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            else
                            {
                                AMQP_VALUE source = NULL;
                                AMQP_VALUE target = NULL;
                                MESSAGE_HANDLE message = NULL;

                                if ((source = messaging_create_source("ingress")) == NULL)
                                {
                                    LogError("Could not create source for link.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else if ((target = messaging_create_target(target_address)) == NULL)
                                {
                                    LogError("Could not create target for link.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else if ((link = link_create(session, "sender-link", role_sender, source, target)) == NULL)
                                {
                                    LogError("Could not create link.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else if (link_set_snd_settle_mode(link, sender_settle_mode_unsettled) != 0)
                                {
                                    LogError("Could not set the sender settle mode.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else if (link_set_max_message_size(link, 65536) != 0)
                                {
                                    LogError("Could not set the message size.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else if ((message = message_create()) == NULL)
                                {
                                    LogError("Could not create a message.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else
                                {
                                    BINARY_DATA binary_data;
                                    binary_data.bytes = data;
                                    binary_data.length = len;
                                    if (message_add_body_amqp_data(message, binary_data) != 0)
                                    {
                                        LogError("Could not add the binary data to the message.");
                                        result = IOTHUB_TEST_CLIENT_ERROR;
                                    }
                                    else
                                    {
                                        PROPERTIES_HANDLE properties = properties_create();
                                        AMQP_VALUE to_amqp_value = amqpvalue_create_string(deviceDest);
                                        (void)properties_set_to(properties, to_amqp_value);
                                        amqpvalue_destroy(to_amqp_value);

                                        if (properties == NULL)
                                        {
                                            LogError("Could not create properties for message.");
                                            result = IOTHUB_TEST_CLIENT_ERROR;
                                        }
                                        else
                                        {
                                            if (message_set_properties(message, properties) != 0)
                                            {
                                                LogError("Could not set the properties on the message.");
                                                result = IOTHUB_TEST_CLIENT_ERROR;
                                            }
                                            else if ((message_sender = messagesender_create(link, NULL, NULL)) == NULL)
                                            {
                                                LogError("Could not create message sender.");
                                                result = IOTHUB_TEST_CLIENT_ERROR;
                                            }
                                            else if (messagesender_open(message_sender) != 0)
                                            {
                                                LogError("Could not open the message sender.");
                                                result = IOTHUB_TEST_CLIENT_ERROR;
                                            }
                                            else
                                            {
                                                MESSAGE_SEND_STATE message_send_state = MESSAGE_SEND_STATE_NOT_SENT;

                                                if (messagesender_send_async(message_sender, message, on_message_send_complete, &message_send_state, 0) != 0)
                                                {
                                                    LogError("Could not set outgoing window.");
                                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                                }
                                                else
                                                {
                                                    size_t numberOfAttempts;
                                                    message_send_state = MESSAGE_SEND_STATE_SEND_IN_PROGRESS;

                                                    for (numberOfAttempts = 0; numberOfAttempts < 100; numberOfAttempts++)
                                                    {
                                                        connection_dowork(connection);

                                                        if (message_send_state != MESSAGE_SEND_STATE_SEND_IN_PROGRESS)
                                                        {
                                                            break;
                                                        }

                                                        ThreadAPI_Sleep(50);
                                                    }

                                                    if (message_send_state != MESSAGE_SEND_STATE_SENT_OK)
                                                    {
                                                        LogError("Failed sending (timed out).");
                                                        result = IOTHUB_TEST_CLIENT_ERROR;
                                                    }
                                                    else
                                                    {
                                                        result = IOTHUB_TEST_CLIENT_OK;
                                                    }
                                                }
                                            }

                                            properties_destroy(properties);
                                        }
                                    }
                                }

                                message_destroy(message);
                                amqpvalue_destroy(source);
                                amqpvalue_destroy(target);
                            }
                        }
                    }

                    free(deviceDest);
                }

                free(target_address);
            }

            free(authcid);

            messagesender_destroy(message_sender);
            link_destroy(link);
            session_destroy(session);
            connection_destroy(connection);
            xio_destroy(sasl_io);
            xio_destroy(tls_io);
            saslmechanism_destroy(sasl_mechanism_handle);
        }
    }

    return result;
}