IOTHUB_TEST_CLIENT_RESULT IoTHubTest_ListenForEvent()

in testtools/iothub_test/src/iothubtest.c [1472:1720]


IOTHUB_TEST_CLIENT_RESULT IoTHubTest_ListenForEvent(IOTHUB_TEST_HANDLE devhubHandle, pfIoTHubMessageCallback msgCallback, size_t partitionCount, void* context, time_t receiveTimeRangeStart, double maxDrainTimeInSeconds)
{
    IOTHUB_TEST_CLIENT_RESULT result = 0;
    if (devhubHandle == NULL || msgCallback == NULL)
    {
        LogError("Invalid parameter given in IoTHubTest_ListenForEvent DevhubHandle: 0x%p\r\nMessage Callback: 0x%p.", devhubHandle, msgCallback);
        result = IOTHUB_TEST_CLIENT_ERROR;
    }
    else
    {
        XIO_HANDLE sasl_io = NULL;
        CONNECTION_HANDLE connection = NULL;
        SESSION_HANDLE session = NULL;
        LINK_HANDLE link = NULL;
        IOTHUB_VALIDATION_INFO* devhubValInfo = (IOTHUB_VALIDATION_INFO*)devhubHandle;

        char* eh_hostname = CreateReceiveHostName(devhubValInfo);
        if (eh_hostname == NULL)
        {
            LogError("Failed getting eh_hostname.");
            result = IOTHUB_TEST_CLIENT_ERROR;
        }
        else
        {
            char* receive_address = CreateReceiveAddress(devhubValInfo, partitionCount);
            if (receive_address == NULL)
            {
                LogError("Failed getting receive_address.");
                result = IOTHUB_TEST_CLIENT_ERROR;
            }
            else
            {
                /* create SASL plain handler */
                SASL_PLAIN_CONFIG sasl_plain_config;
                sasl_plain_config.authcid = "iothubowner";
                sasl_plain_config.passwd = devhubValInfo->eventhubAccessKey;
                sasl_plain_config.authzid = NULL;
                const SASL_MECHANISM_INTERFACE_DESCRIPTION* sasl_plain_interface_description;
                SASL_MECHANISM_HANDLE sasl_mechanism_handle = NULL;
                XIO_HANDLE tls_io = NULL;
                TLSIO_CONFIG tls_io_config;
                tls_io_config.hostname = eh_hostname;
                tls_io_config.port = 5671;
                tls_io_config.underlying_io_interface = NULL;
                tls_io_config.underlying_io_parameters = NULL;
                const IO_INTERFACE_DESCRIPTION* tlsio_interface = NULL;

                if ((sasl_plain_interface_description = saslplain_get_interface()) == NULL)
                {
                    LogError("Failed getting saslplain_get_interface.");
                    result = IOTHUB_TEST_CLIENT_ERROR;
                }
                else if ((sasl_mechanism_handle = saslmechanism_create(sasl_plain_interface_description, &sasl_plain_config)) == NULL)
                {
                    LogError("Failed creating sasl PLAN mechanism.");
                    result = IOTHUB_TEST_CLIENT_ERROR;
                }
                else if ((tlsio_interface = platform_get_default_tlsio()) == NULL)
                {
                    LogError("Failed getting default TLS IO interface.");
                    result = IOTHUB_TEST_CLIENT_ERROR;
                }
                else if ((tls_io = xio_create(tlsio_interface, &tls_io_config)) == NULL)
                {
                    LogError("Failed creating the TLS IO.");
                    result = IOTHUB_TEST_CLIENT_ERROR;
                }
                else
                {
#ifdef SET_TRUSTED_CERT
                    xio_setoption(tls_io, OPTION_TRUSTED_CERT, test_service_cert);
#endif // SET_TRUSTED_CERT

                    /* create the SASL client IO using the TLS IO */
                    LogInfo("IoTHubTest_ListenForEvent: SASL client IO using the TLS IO.");
                    SASLCLIENTIO_CONFIG sasl_io_config;
                    sasl_io_config.underlying_io = tls_io;
                    sasl_io_config.sasl_mechanism = sasl_mechanism_handle;
                    if ((sasl_io = xio_create(saslclientio_get_interface_description(), &sasl_io_config)) == NULL)
                    {
                        LogError("Failed creating the SASL IO.");
                        result = IOTHUB_TEST_CLIENT_ERROR;
                    }
                    /* create the connection, session and link */
                    else if ((connection = connection_create(sasl_io, eh_hostname, "e2etest_link", NULL, NULL)) == NULL)
                    {
                        LogError("Failed creating the connection.");
                        result = IOTHUB_TEST_CLIENT_ERROR;
                    }
                    else if ((session = session_create(connection, NULL, NULL)) == NULL)
                    {
                        LogError("Failed creating the session.");
                        result = IOTHUB_TEST_CLIENT_ERROR;
                    }
                    else if (session_set_incoming_window(session, 100) != 0)
                    {
                        /* set incoming window to 100 for the session */
                        LogError("Failed setting the session incoming window.");
                        result = IOTHUB_TEST_CLIENT_ERROR;
                    }
                    else
                    {
                        char tempBuffer[256];
                        const char filter_name[] = "apache.org:selector-filter:string";
                        int filter_string_length = sprintf(tempBuffer, "amqp.annotation.x-opt-enqueuedtimeutc > %llu", ((unsigned long long)receiveTimeRangeStart - 330) * 1000);
                        if (filter_string_length < 0)
                        {
                            LogError("Failed creating filter set with enqueuedtimeutc filter.");
                            result = IOTHUB_TEST_CLIENT_ERROR;
                        }
                        else
                        {
                            LogInfo("IoTHubTest_ListenForEvent: Create AMQP filter.");
                            /* create the filter set to be used for the source of the link */
                            filter_set filter_set = amqpvalue_create_map();
                            AMQP_VALUE filter_key = amqpvalue_create_symbol(filter_name);
                            AMQP_VALUE descriptor = amqpvalue_create_symbol(filter_name);
                            AMQP_VALUE filter_value = amqpvalue_create_string(tempBuffer);
                            AMQP_VALUE described_filter_value = amqpvalue_create_described(descriptor, filter_value);
                            amqpvalue_set_map_value(filter_set, filter_key, described_filter_value);
                            amqpvalue_destroy(described_filter_value);
                            amqpvalue_destroy(filter_key);

                            if (filter_set == NULL)
                            {
                                LogError("Failed creating filter set with enqueuedtimeutc filter.");
                                result = IOTHUB_TEST_CLIENT_ERROR;
                            }
                            else
                            {
                                AMQP_VALUE target = NULL;
                                AMQP_VALUE source = NULL;

                                /* create the source of the link */
                                SOURCE_HANDLE source_handle = source_create();
                                AMQP_VALUE address_value = amqpvalue_create_string(receive_address);
                                source_set_address(source_handle, address_value);
                                source_set_filter(source_handle, filter_set);
                                amqpvalue_destroy(address_value);
                                source = amqpvalue_create_source(source_handle);
                                source_destroy(source_handle);

                                if (source == NULL)
                                {
                                    LogError("Failed creating source for link.");
                                    result = IOTHUB_TEST_CLIENT_ERROR;
                                }
                                else
                                {
                                    target = messaging_create_target(receive_address);
                                    if (target == NULL)
                                    {
                                        LogError("Failed creating target for link.");
                                        result = IOTHUB_TEST_CLIENT_ERROR;
                                    }
                                    else if ((link = link_create(session, "receiver-link", role_receiver, source, target)) == NULL)
                                    {
                                        LogError("Failed creating link.");
                                        result = IOTHUB_TEST_CLIENT_ERROR;
                                    }
                                    else if (link_set_rcv_settle_mode(link, receiver_settle_mode_first) != 0)
                                    {
                                        LogError("Failed setting link receive settle mode.");
                                        result = IOTHUB_TEST_CLIENT_ERROR;
                                    }
                                    else
                                    {
                                        LogInfo("IoTHubTest_ListenForEvent: Create message receiver.");
                                        MESSAGE_RECEIVER_CONTEXT message_receiver_context;
                                        message_receiver_context.msgCallback = msgCallback;
                                        message_receiver_context.context = context;
                                        message_receiver_context.message_received = false;
                                        MESSAGE_RECEIVER_HANDLE message_receiver = NULL;

                                        MESSAGE_RECEIVER_STATE_CHANGED_CONTEXT message_receiver_state_changed_context = {0};

                                        /* create a message receiver */
                                        message_receiver = messagereceiver_create(link, on_message_receiver_state_changed, &message_receiver_state_changed_context);
                                        if (message_receiver == NULL)
                                        {
                                            LogError("Failed creating message receiver.");
                                            result = IOTHUB_TEST_CLIENT_ERROR;
                                        }
                                        else if (messagereceiver_open_with_retry(message_receiver, on_message_received, &message_receiver_context) != 0)
                                        {
                                            LogError("Failed opening message receiver.");
                                            result = IOTHUB_TEST_CLIENT_ERROR;
                                            messagereceiver_destroy(message_receiver);
                                        }
                                        else
                                        {
                                            time_t nowExecutionTime;
                                            time_t beginExecutionTime = time(NULL);
                                            double timespan;

                                            LogInfo("IoTHubTest_ListenForEvent: Waiting for message_received.");
                                            while ((nowExecutionTime = time(NULL)), timespan = difftime(nowExecutionTime, beginExecutionTime), timespan < maxDrainTimeInSeconds)
                                            {
                                                connection_dowork(connection);
                                                ThreadAPI_Sleep(10);

                                                if (message_receiver_context.message_received || message_receiver_state_changed_context.state == MESSAGE_RECEIVER_STATE_ERROR)
                                                {
                                                    break;
                                                }
                                            }

                                            if (message_receiver_state_changed_context.state == MESSAGE_RECEIVER_STATE_ERROR)
                                            {
                                                LogError("message receiver entered an error state.");
                                                result = IOTHUB_TEST_CLIENT_ERROR;
                                            }
                                            else if (!message_receiver_context.message_received)
                                            {
                                                LogError("No message was received, timed out.");
                                                result = IOTHUB_TEST_CLIENT_ERROR;
                                            }
                                            else
                                            {
                                                result = IOTHUB_TEST_CLIENT_OK;
                                            }
                                            messagereceiver_destroy(message_receiver);
                                        }

                                        amqpvalue_destroy(target);
                                    }
                                    amqpvalue_destroy(source);
                                }
                                amqpvalue_destroy(filter_set);
                            }
                        }
                    }
                    link_destroy(link);
                    session_destroy(session);
                    connection_destroy(connection);
                    xio_destroy(sasl_io);
                    xio_destroy(tls_io);
                    saslmechanism_destroy(sasl_mechanism_handle);
                }

                free(receive_address);
            }

            free(eh_hostname);
        }
    }

    return result;
}