in iothub_client/src/iothubtransportamqp_methods.c [592:764]
int iothubtransportamqp_methods_subscribe(IOTHUBTRANSPORT_AMQP_METHODS_HANDLE iothubtransport_amqp_methods_handle,
SESSION_HANDLE session_handle, ON_METHODS_ERROR on_methods_error, void* on_methods_error_context,
ON_METHOD_REQUEST_RECEIVED on_method_request_received, void* on_method_request_received_context,
ON_METHODS_UNSUBSCRIBED on_methods_unsubscribed, void* on_methods_unsubscribed_context)
{
int result;
if ((iothubtransport_amqp_methods_handle == NULL) ||
(session_handle == NULL) ||
(on_methods_error == NULL) ||
(on_method_request_received == NULL) ||
(on_methods_unsubscribed == NULL))
{
LogError("Invalid arguments: iothubtransport_amqp_methods_handle=%p, session_handle=%p, on_methods_error=%p, on_method_request_received=%p, on_methods_unsubscribed=%p",
iothubtransport_amqp_methods_handle, session_handle, on_methods_error, on_method_request_received, on_methods_unsubscribed);
result = MU_FAILURE;
}
else if (iothubtransport_amqp_methods_handle->subscribe_state != SUBSCRIBE_STATE_NOT_SUBSCRIBED)
{
LogError("Already subscribed");
result = MU_FAILURE;
}
else
{
STRING_HANDLE peer_endpoint_string = create_peer_endpoint_name(iothubtransport_amqp_methods_handle);
if (peer_endpoint_string == NULL)
{
result = MU_FAILURE;
}
else
{
iothubtransport_amqp_methods_handle->on_method_request_received = on_method_request_received;
iothubtransport_amqp_methods_handle->on_method_request_received_context = on_method_request_received_context;
iothubtransport_amqp_methods_handle->on_methods_error = on_methods_error;
iothubtransport_amqp_methods_handle->on_methods_error_context = on_methods_error_context;
iothubtransport_amqp_methods_handle->on_methods_unsubscribed = on_methods_unsubscribed;
iothubtransport_amqp_methods_handle->on_methods_unsubscribed_context = on_methods_unsubscribed_context;
AMQP_VALUE receiver_source = messaging_create_source(STRING_c_str(peer_endpoint_string));
if (receiver_source == NULL)
{
LogError("Cannot create receiver source");
result = MU_FAILURE;
}
else
{
AMQP_VALUE receiver_target = messaging_create_target("requests");
if (receiver_target == NULL)
{
LogError("Cannot create receiver target");
result = MU_FAILURE;
}
else
{
STRING_HANDLE requests_link_name = create_requests_link_name(iothubtransport_amqp_methods_handle);
if (requests_link_name == NULL)
{
LogError("Cannot create methods requests link name.");
result = MU_FAILURE;
}
else
{
iothubtransport_amqp_methods_handle->receiver_link = link_create(session_handle, STRING_c_str(requests_link_name), role_receiver, receiver_source, receiver_target);
if (iothubtransport_amqp_methods_handle->receiver_link == NULL)
{
LogError("Cannot create receiver link");
result = MU_FAILURE;
}
else
{
AMQP_VALUE sender_source = messaging_create_source("responses");
if (sender_source == NULL)
{
LogError("Cannot create sender source");
result = MU_FAILURE;
}
else
{
AMQP_VALUE sender_target = messaging_create_target(STRING_c_str(peer_endpoint_string));
if (sender_target == NULL)
{
LogError("Cannot create sender target");
result = MU_FAILURE;
}
else
{
STRING_HANDLE responses_link_name = create_responses_link_name(iothubtransport_amqp_methods_handle);
if (responses_link_name == NULL)
{
LogError("Cannot create methods responses link name.");
result = MU_FAILURE;
}
else
{
iothubtransport_amqp_methods_handle->sender_link = link_create(session_handle, STRING_c_str(responses_link_name), role_sender, sender_source, sender_target);
if (iothubtransport_amqp_methods_handle->sender_link == NULL)
{
LogError("Cannot create sender link");
result = MU_FAILURE;
}
else
{
if (set_link_attach_properties(iothubtransport_amqp_methods_handle) != 0)
{
result = MU_FAILURE;
}
else
{
iothubtransport_amqp_methods_handle->message_receiver = messagereceiver_create(iothubtransport_amqp_methods_handle->receiver_link, on_message_receiver_state_changed, iothubtransport_amqp_methods_handle);
if (iothubtransport_amqp_methods_handle->message_receiver == NULL)
{
LogError("Cannot create message receiver");
result = MU_FAILURE;
}
else
{
iothubtransport_amqp_methods_handle->message_sender = messagesender_create(iothubtransport_amqp_methods_handle->sender_link, on_message_sender_state_changed, iothubtransport_amqp_methods_handle);
if (iothubtransport_amqp_methods_handle->message_sender == NULL)
{
LogError("Cannot create message sender");
result = MU_FAILURE;
}
else
{
if (messagesender_open(iothubtransport_amqp_methods_handle->message_sender) != 0)
{
LogError("Cannot open the message sender");
result = MU_FAILURE;
}
else
{
if (messagereceiver_open(iothubtransport_amqp_methods_handle->message_receiver, on_message_received, iothubtransport_amqp_methods_handle) != 0)
{
LogError("Cannot open the message receiver");
result = MU_FAILURE;
}
else
{
iothubtransport_amqp_methods_handle->subscribe_state = SUBSCRIBE_STATE_SUBSCRIBED;
result = 0;
}
}
}
}
}
}
STRING_delete(responses_link_name);
}
amqpvalue_destroy(sender_target);
}
amqpvalue_destroy(sender_source);
}
}
STRING_delete(requests_link_name);
}
amqpvalue_destroy(receiver_target);
}
amqpvalue_destroy(receiver_source);
}
STRING_delete(peer_endpoint_string);
}
}
return result;
}