in testtools/iothub_test/src/iothubtest.c [1472:1720]
IOTHUB_TEST_CLIENT_RESULT IoTHubTest_ListenForEvent(IOTHUB_TEST_HANDLE devhubHandle, pfIoTHubMessageCallback msgCallback, size_t partitionCount, void* context, time_t receiveTimeRangeStart, double maxDrainTimeInSeconds)
{
IOTHUB_TEST_CLIENT_RESULT result = 0;
if (devhubHandle == NULL || msgCallback == NULL)
{
LogError("Invalid parameter given in IoTHubTest_ListenForEvent DevhubHandle: 0x%p\r\nMessage Callback: 0x%p.", devhubHandle, msgCallback);
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
XIO_HANDLE sasl_io = NULL;
CONNECTION_HANDLE connection = NULL;
SESSION_HANDLE session = NULL;
LINK_HANDLE link = NULL;
IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)devhubHandle;
char* eh_hostname = CreateReceiveHostName(devhubValInfo);
if (eh_hostname == NULL)
{
LogError("Failed getting eh_hostname.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
char* receive_address = CreateReceiveAddress(devhubValInfo, partitionCount);
if (receive_address == NULL)
{
LogError("Failed getting receive_address.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
/* create SASL plain handler */
SASL_PLAIN_CONFIG sasl_plain_config;
sasl_plain_config.authcid = "iothubowner";
sasl_plain_config.passwd = devhubValInfo->eventhubAccessKey;
sasl_plain_config.authzid = NULL;
const SASL_MECHANISM_INTERFACE_DESCRIPTION* sasl_plain_interface_description;
SASL_MECHANISM_HANDLE sasl_mechanism_handle = NULL;
XIO_HANDLE tls_io = NULL;
TLSIO_CONFIG tls_io_config;
tls_io_config.hostname = eh_hostname;
tls_io_config.port = 5671;
tls_io_config.underlying_io_interface = NULL;
tls_io_config.underlying_io_parameters = NULL;
const IO_INTERFACE_DESCRIPTION* tlsio_interface = NULL;
if ((sasl_plain_interface_description = saslplain_get_interface()) == NULL)
{
LogError("Failed getting saslplain_get_interface.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((sasl_mechanism_handle = saslmechanism_create(sasl_plain_interface_description, &sasl_plain_config)) == NULL)
{
LogError("Failed creating sasl PLAN mechanism.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((tlsio_interface = platform_get_default_tlsio()) == NULL)
{
LogError("Failed getting default TLS IO interface.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((tls_io = xio_create(tlsio_interface, &tls_io_config)) == NULL)
{
LogError("Failed creating the TLS IO.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
#ifdef SET_TRUSTED_CERT
xio_setoption(tls_io, OPTION_TRUSTED_CERT, test_service_cert);
#endif // SET_TRUSTED_CERT
/* create the SASL client IO using the TLS IO */
LogInfo("IoTHubTest_ListenForEvent: SASL client IO using the TLS IO.");
SASLCLIENTIO_CONFIG sasl_io_config;
sasl_io_config.underlying_io = tls_io;
sasl_io_config.sasl_mechanism = sasl_mechanism_handle;
if ((sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_io_config)) == NULL)
{
LogError("Failed creating the SASL IO.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
/* create the connection, session and link */
else if ((connection = connection_create(sasl_io, eh_hostname, "e2etest_link", NULL, NULL)) == NULL)
{
LogError("Failed creating the connection.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((session = session_create(connection, NULL, NULL)) == NULL)
{
LogError("Failed creating the session.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (session_set_incoming_window(session, 100) != 0)
{
/* set incoming window to 100 for the session */
LogError("Failed setting the session incoming window.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
char tempBuffer[256];
const char filter_name[] = "apache.org:selector-filter:string";
int filter_string_length = sprintf(tempBuffer, "amqp.annotation.x-opt-enqueuedtimeutc > %llu", ((unsigned long long)receiveTimeRangeStart - 330) * 1000);
if (filter_string_length < 0)
{
LogError("Failed creating filter set with enqueuedtimeutc filter.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
LogInfo("IoTHubTest_ListenForEvent: Create AMQP filter.");
/* create the filter set to be used for the source of the link */
filter_set filter_set = amqpvalue_create_map();
AMQP_VALUE filter_key = amqpvalue_create_symbol(filter_name);
AMQP_VALUE descriptor = amqpvalue_create_symbol(filter_name);
AMQP_VALUE filter_value = amqpvalue_create_string(tempBuffer);
AMQP_VALUE described_filter_value = amqpvalue_create_described(descriptor, filter_value);
amqpvalue_set_map_value(filter_set, filter_key, described_filter_value);
amqpvalue_destroy(described_filter_value);
amqpvalue_destroy(filter_key);
if (filter_set == NULL)
{
LogError("Failed creating filter set with enqueuedtimeutc filter.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
AMQP_VALUE target = NULL;
AMQP_VALUE source = NULL;
/* create the source of the link */
SOURCE_HANDLE source_handle = source_create();
AMQP_VALUE address_value = amqpvalue_create_string(receive_address);
source_set_address(source_handle, address_value);
source_set_filter(source_handle, filter_set);
amqpvalue_destroy(address_value);
source = amqpvalue_create_source(source_handle);
source_destroy(source_handle);
if (source == NULL)
{
LogError("Failed creating source for link.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
target = messaging_create_target(receive_address);
if (target == NULL)
{
LogError("Failed creating target for link.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if ((link = link_create(session, "receiver-link", role_receiver, source, target)) == NULL)
{
LogError("Failed creating link.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (link_set_rcv_settle_mode(link, receiver_settle_mode_first) != 0)
{
LogError("Failed setting link receive settle mode.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
LogInfo("IoTHubTest_ListenForEvent: Create message receiver.");
MESSAGE_RECEIVER_CONTEXT message_receiver_context;
message_receiver_context.msgCallback = msgCallback;
message_receiver_context.context = context;
message_receiver_context.message_received = false;
MESSAGE_RECEIVER_HANDLE message_receiver = NULL;
MESSAGE_RECEIVER_STATE_CHANGED_CONTEXT message_receiver_state_changed_context = {0};
/* create a message receiver */
message_receiver = messagereceiver_create(link, on_message_receiver_state_changed, &message_receiver_state_changed_context);
if (message_receiver == NULL)
{
LogError("Failed creating message receiver.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (messagereceiver_open_with_retry(message_receiver, on_message_received, &message_receiver_context) != 0)
{
LogError("Failed opening message receiver.");
result = IOTHUB_TEST_CLIENT_ERROR;
messagereceiver_destroy(message_receiver);
}
else
{
time_t nowExecutionTime;
time_t beginExecutionTime = time(NULL);
double timespan;
LogInfo("IoTHubTest_ListenForEvent: Waiting for message_received.");
while ((nowExecutionTime = time(NULL)), timespan = difftime(nowExecutionTime, beginExecutionTime), timespan < maxDrainTimeInSeconds)
{
connection_dowork(connection);
ThreadAPI_Sleep(10);
if (message_receiver_context.message_received || message_receiver_state_changed_context.state == MESSAGE_RECEIVER_STATE_ERROR)
{
break;
}
}
if (message_receiver_state_changed_context.state == MESSAGE_RECEIVER_STATE_ERROR)
{
LogError("message receiver entered an error state.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else if (!message_receiver_context.message_received)
{
LogError("No message was received, timed out.");
result = IOTHUB_TEST_CLIENT_ERROR;
}
else
{
result = IOTHUB_TEST_CLIENT_OK;
}
messagereceiver_destroy(message_receiver);
}
amqpvalue_destroy(target);
}
amqpvalue_destroy(source);
}
amqpvalue_destroy(filter_set);
}
}
}
link_destroy(link);
session_destroy(session);
connection_destroy(connection);
xio_destroy(sasl_io);
xio_destroy(tls_io);
saslmechanism_destroy(sasl_mechanism_handle);
}
free(receive_address);
}
free(eh_hostname);
}
}
return result;
}