in testtools/iothub_test/src/iothubtest.c [1746:2014]
IOTHUB_TEST_CLIENT_RESULT IoTHubTest_SendMessage(IOTHUB_TEST_HANDLE devhubHandle, const unsigned char* data, size_t len)
{
IOTHUB_TEST_CLIENT_RESULT result;
if ((devhubHandle == NULL) ||
((len == 0) && (data != NULL)) ||
((data != NULL) && (len == 0)))
{
LogError("Invalid arguments for IoTHubTest_SendMessage, devhubHandle = %p, len = %zu, data = %p.", devhubHandle, len, data);
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)devhubHandle;
char* authcid = CreateSendAuthCid(devhubValInfo);
if (authcid == NULL)
{
LogError("Could not create authcid for SASL plain.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
XIO_HANDLE sasl_io = NULL;
CONNECTION_HANDLE connection = NULL;
SESSION_HANDLE session = NULL;
LINK_HANDLE link = NULL;
MESSAGE_SENDER_HANDLE message_sender = NULL;
SASL_MECHANISM_HANDLE sasl_mechanism_handle = NULL;
XIO_HANDLE tls_io = NULL;
char* target_address = CreateSendTargetAddress(devhubValInfo);
if (target_address == NULL)
{
LogError("Could not create target_address string.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
size_t deviceDestLen = strlen(AMQP_ADDRESS_PATH_FMT) + strlen(devhubValInfo->deviceId) + 1;
char* deviceDest = (char*)malloc(deviceDestLen + 1);
if (deviceDest == NULL)
{
LogError("Could not create device destination string.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
SASL_PLAIN_CONFIG sasl_plain_config;
sasl_plain_config.authcid = authcid;
sasl_plain_config.passwd = devhubValInfo->iotSharedSig;
sasl_plain_config.authzid = NULL;
const SASL_MECHANISM_INTERFACE_DESCRIPTION* sasl_mechanism_interface_description;
(void)sprintf_s(deviceDest, deviceDestLen + 1, AMQP_ADDRESS_PATH_FMT, devhubValInfo->deviceId);
if ((sasl_mechanism_interface_description = saslplain_get_interface()) == NULL)
{
LogError("Could not get SASL plain mechanism interface.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((sasl_mechanism_handle = saslmechanism_create(sasl_mechanism_interface_description, &sasl_plain_config)) == NULL)
{
LogError("Could not create SASL plain mechanism.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
/* create the TLS IO */
TLSIO_CONFIG tls_io_config;
tls_io_config.hostname = devhubValInfo->hostName;
tls_io_config.port = 5671;
tls_io_config.underlying_io_interface = NULL;
tls_io_config.underlying_io_parameters = NULL;
const IO_INTERFACE_DESCRIPTION* tlsio_interface;
if ((tlsio_interface = platform_get_default_tlsio()) == NULL)
{
LogError("Could not get default TLS IO interface.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((tls_io = xio_create(tlsio_interface, &tls_io_config)) == NULL)
{
LogError("Could not create TLS IO.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
/* create the SASL client IO using the TLS IO */
SASLCLIENTIO_CONFIG sasl_io_config;
sasl_io_config.underlying_io = tls_io;
sasl_io_config.sasl_mechanism = sasl_mechanism_handle;
const IO_INTERFACE_DESCRIPTION* saslclientio_interface;
if ((saslclientio_interface = saslclientio_get_interface_description()) == NULL)
{
LogError("Could not create get SASL IO interface description.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((sasl_io = xio_create(saslclientio_interface, &sasl_io_config)) == NULL)
{
LogError("Could not create SASL IO.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
/* create the connection, session and link */
else if ((connection = connection_create(sasl_io, devhubValInfo->hostName, "some", NULL, NULL)) == NULL)
{
LogError("Could not create connection.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((session = session_create(connection, NULL, NULL)) == NULL)
{
LogError("Could not create session.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (session_set_incoming_window(session, 2147483647) != 0)
{
LogError("Could not set incoming window.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (session_set_outgoing_window(session, 65536) != 0)
{
LogError("Could not set outgoing window.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
AMQP_VALUE source = NULL;
AMQP_VALUE target = NULL;
MESSAGE_HANDLE message = NULL;
if ((source = messaging_create_source("ingress")) == NULL)
{
LogError("Could not create source for link.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((target = messaging_create_target(target_address)) == NULL)
{
LogError("Could not create target for link.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((link = link_create(session, "sender-link", role_sender, source, target)) == NULL)
{
LogError("Could not create link.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (link_set_snd_settle_mode(link, sender_settle_mode_unsettled) != 0)
{
LogError("Could not set the sender settle mode.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (link_set_max_message_size(link, 65536) != 0)
{
LogError("Could not set the message size.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((message = message_create()) == NULL)
{
LogError("Could not create a message.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
BINARY_DATA binary_data;
binary_data.bytes = data;
binary_data.length = len;
if (message_add_body_amqp_data(message, binary_data) != 0)
{
LogError("Could not add the binary data to the message.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
PROPERTIES_HANDLE properties = properties_create();
AMQP_VALUE to_amqp_value = amqpvalue_create_string(deviceDest);
(void)properties_set_to(properties, to_amqp_value);
amqpvalue_destroy(to_amqp_value);
if (properties == NULL)
{
LogError("Could not create properties for message.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
if (message_set_properties(message, properties) != 0)
{
LogError("Could not set the properties on the message.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((message_sender = messagesender_create(link, NULL, NULL)) == NULL)
{
LogError("Could not create message sender.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (messagesender_open(message_sender) != 0)
{
LogError("Could not open the message sender.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
MESSAGE_SEND_STATE message_send_state = MESSAGE_SEND_STATE_NOT_SENT;
if (messagesender_send_async(message_sender, message, on_message_send_complete, &message_send_state, 0) != 0)
{
LogError("Could not set outgoing window.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
size_t numberOfAttempts;
message_send_state = MESSAGE_SEND_STATE_SEND_IN_PROGRESS;
for (numberOfAttempts = 0; numberOfAttempts < 100; numberOfAttempts++)
{
connection_dowork(connection);
if (message_send_state != MESSAGE_SEND_STATE_SEND_IN_PROGRESS)
{
break;
}
ThreadAPI_Sleep(50);
}
if (message_send_state != MESSAGE_SEND_STATE_SENT_OK)
{
LogError("Failed sending (timed out).");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
result = IOTHUB_TEST_CLIENT_OK;
}
}
}
properties_destroy(properties);
}
}
}
message_destroy(message);
amqpvalue_destroy(source);
amqpvalue_destroy(target);
}
}
}
free(deviceDest);
}
free(target_address);
}
free(authcid);
messagesender_destroy(message_sender);
link_destroy(link);
session_destroy(session);
connection_destroy(connection);
xio_destroy(sasl_io);
xio_destroy(tls_io);
saslmechanism_destroy(sasl_mechanism_handle);
}
}
return result;
}