in src/session.c [395:743]
static void on_frame_received(void* context, AMQP_VALUE performative, uint32_t payload_size, const unsigned char* payload_bytes)
{
SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)context;
AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
if (is_begin_type_by_descriptor(descriptor))
{
BEGIN_HANDLE begin_handle;
if (amqpvalue_get_begin(performative, &begin_handle) != 0)
{
connection_close(session_instance->connection, "amqp:decode-error", "Cannot decode BEGIN frame", NULL);
}
else
{
if ((begin_get_incoming_window(begin_handle, &session_instance->remote_incoming_window) != 0) ||
(begin_get_next_outgoing_id(begin_handle, &session_instance->next_incoming_id) != 0))
{
/* error */
begin_destroy(begin_handle);
session_set_state(session_instance, SESSION_STATE_DISCARDING);
connection_close(session_instance->connection, "amqp:decode-error", "Cannot get incoming windows and next outgoing id", NULL);
}
else
{
begin_destroy(begin_handle);
if (session_instance->session_state == SESSION_STATE_BEGIN_SENT)
{
session_set_state(session_instance, SESSION_STATE_MAPPED);
}
else if(session_instance->session_state == SESSION_STATE_UNMAPPED)
{
session_set_state(session_instance, SESSION_STATE_BEGIN_RCVD);
if (send_begin(session_instance) != 0)
{
connection_close(session_instance->connection, "amqp:internal-error", "Failed sending BEGIN frame", NULL);
session_set_state(session_instance, SESSION_STATE_DISCARDING);
}
else
{
session_set_state(session_instance, SESSION_STATE_MAPPED);
}
}
}
}
}
else if (is_attach_type_by_descriptor(descriptor))
{
const char* name = NULL;
ATTACH_HANDLE attach_handle;
if (amqpvalue_get_attach(performative, &attach_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode ATTACH frame");
}
else
{
role role;
AMQP_VALUE source;
AMQP_VALUE target;
fields properties;
if (attach_get_name(attach_handle, &name) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get link name from ATTACH frame");
}
else if (attach_get_role(attach_handle, &role) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get link role from ATTACH frame");
}
else
{
LINK_ENDPOINT_INSTANCE* link_endpoint;
if (attach_get_source(attach_handle, &source) != 0)
{
source = NULL;
}
if (attach_get_target(attach_handle, &target) != 0)
{
target = NULL;
}
if (attach_get_properties(attach_handle, &properties) != 0)
{
properties = NULL;
}
link_endpoint = find_link_endpoint_by_name(session_instance, name);
if (link_endpoint == NULL)
{
/* new link attach */
if (session_instance->on_link_attached != NULL)
{
LINK_ENDPOINT_HANDLE new_link_endpoint = session_create_link_endpoint(session_instance, name);
if (new_link_endpoint == NULL)
{
end_session_with_error(session_instance, "amqp:internal-error", "Cannot create link endpoint");
}
else if (attach_get_handle(attach_handle, &new_link_endpoint->input_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
}
else
{
new_link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_ATTACHED;
if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, new_link_endpoint, name, role, source, target, properties))
{
remove_link_endpoint(new_link_endpoint);
free_link_endpoint(new_link_endpoint);
new_link_endpoint = NULL;
}
else
{
if (new_link_endpoint->frame_received_callback != NULL)
{
new_link_endpoint->frame_received_callback(new_link_endpoint->callback_context, performative, payload_size, payload_bytes);
}
}
}
}
}
else if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
{
if (attach_get_handle(attach_handle, &link_endpoint->input_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get input handle from ATTACH frame");
}
else
{
link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_ATTACHED;
if (session_instance->on_link_attached != NULL)
{
if (!session_instance->on_link_attached(session_instance->on_link_attached_callback_context, link_endpoint, name, role, source, target, properties))
{
link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_DETACHING;
}
}
link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
}
}
}
attach_destroy(attach_handle);
}
}
else if (is_detach_type_by_descriptor(descriptor))
{
DETACH_HANDLE detach_handle;
if (amqpvalue_get_detach(performative, &detach_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode DETACH frame");
}
else
{
uint32_t remote_handle;
if (detach_get_handle(detach_handle, &remote_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from DETACH frame");
detach_destroy(detach_handle);
}
else
{
LINK_ENDPOINT_INSTANCE* link_endpoint;
detach_destroy(detach_handle);
link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle);
if (link_endpoint == NULL)
{
end_session_with_error(session_instance, "amqp:session:unattached-handle", "");
}
else
{
if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
{
link_endpoint->link_endpoint_state = LINK_ENDPOINT_STATE_DETACHING;
link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
}
else
{
/* remove the link endpoint */
remove_link_endpoint(link_endpoint);
free_link_endpoint(link_endpoint);
}
}
}
}
}
else if (is_flow_type_by_descriptor(descriptor))
{
FLOW_HANDLE flow_handle;
if (amqpvalue_get_flow(performative, &flow_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame");
}
else
{
uint32_t remote_handle;
transfer_number flow_next_incoming_id;
uint32_t flow_incoming_window;
if (flow_get_next_incoming_id(flow_handle, &flow_next_incoming_id) != 0)
{
/*
If the next-incoming-id field of the flow frame is not set,
then remote-incomingwindow is computed as follows:
initial-outgoing-id(endpoint) + incoming-window(flow) - next-outgoing-id(endpoint)
*/
flow_next_incoming_id = session_instance->next_outgoing_id;
}
if ((flow_get_next_outgoing_id(flow_handle, &session_instance->next_incoming_id) != 0) ||
(flow_get_incoming_window(flow_handle, &flow_incoming_window) != 0))
{
flow_destroy(flow_handle);
end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode FLOW frame");
}
else
{
LINK_ENDPOINT_INSTANCE* link_endpoint_instance = NULL;
size_t i;
session_instance->remote_incoming_window = flow_next_incoming_id + flow_incoming_window - session_instance->next_outgoing_id;
if (flow_get_handle(flow_handle, &remote_handle) == 0)
{
link_endpoint_instance = find_link_endpoint_by_input_handle(session_instance, remote_handle);
}
flow_destroy(flow_handle);
if (link_endpoint_instance != NULL)
{
if (link_endpoint_instance->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
{
link_endpoint_instance->frame_received_callback(link_endpoint_instance->callback_context, performative, payload_size, payload_bytes);
}
}
i = 0;
while ((session_instance->remote_incoming_window > 0) && (i < session_instance->link_endpoint_count))
{
/* notify the caller that it can send here */
if (session_instance->link_endpoints[i]->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING &&
session_instance->link_endpoints[i]->on_session_flow_on != NULL)
{
session_instance->link_endpoints[i]->on_session_flow_on(session_instance->link_endpoints[i]->callback_context);
}
i++;
}
}
}
}
else if (is_transfer_type_by_descriptor(descriptor))
{
TRANSFER_HANDLE transfer_handle;
if (amqpvalue_get_transfer(performative, &transfer_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode TRANSFER frame");
}
else
{
uint32_t remote_handle;
delivery_number delivery_id;
transfer_get_delivery_id(transfer_handle, &delivery_id);
if (transfer_get_handle(transfer_handle, &remote_handle) != 0)
{
transfer_destroy(transfer_handle);
end_session_with_error(session_instance, "amqp:decode-error", "Cannot get handle from TRANSFER frame");
}
else
{
LINK_ENDPOINT_INSTANCE* link_endpoint;
transfer_destroy(transfer_handle);
session_instance->next_incoming_id++;
session_instance->remote_outgoing_window--;
session_instance->incoming_window--;
link_endpoint = find_link_endpoint_by_input_handle(session_instance, remote_handle);
if (link_endpoint == NULL)
{
end_session_with_error(session_instance, "amqp:session:unattached-handle", "");
}
else
{
if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
{
link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
}
}
if (session_instance->incoming_window == 0)
{
session_instance->incoming_window = session_instance->desired_incoming_window;
send_flow(session_instance);
}
}
}
}
else if (is_disposition_type_by_descriptor(descriptor))
{
uint32_t i;
for (i = 0; i < session_instance->link_endpoint_count; i++)
{
LINK_ENDPOINT_INSTANCE* link_endpoint = session_instance->link_endpoints[i];
if (link_endpoint->link_endpoint_state != LINK_ENDPOINT_STATE_DETACHING)
{
link_endpoint->frame_received_callback(link_endpoint->callback_context, performative, payload_size, payload_bytes);
}
}
}
else if (is_end_type_by_descriptor(descriptor))
{
END_HANDLE end_handle;
if (amqpvalue_get_end(performative, &end_handle) != 0)
{
end_session_with_error(session_instance, "amqp:decode-error", "Cannot decode END frame");
}
else
{
end_destroy(end_handle);
if ((session_instance->session_state != SESSION_STATE_END_RCVD) &&
(session_instance->session_state != SESSION_STATE_DISCARDING))
{
session_set_state(session_instance, SESSION_STATE_END_RCVD);
if (send_end_frame(session_instance, NULL) != 0)
{
/* fatal error */
(void)connection_close(session_instance->connection, "amqp:internal-error", "Cannot send END frame.", NULL);
}
session_set_state(session_instance, SESSION_STATE_DISCARDING);
}
}
}
}