in src/connection.c [814:1108]
static void on_amqp_frame_received(void* context, uint16_t channel, AMQP_VALUE performative, const unsigned char* payload_bytes, uint32_t payload_size)
{
CONNECTION_HANDLE connection = (CONNECTION_HANDLE)context;
(void)channel;
if (tickcounter_get_current_ms(connection->tick_counter, &connection->last_frame_received_time) != 0)
{
LogError("Cannot get tickcounter value");
close_connection_with_error(connection, "amqp:internal-error", "cannot get current tick count", NULL);
}
else
{
if (connection->is_underlying_io_open)
{
switch (connection->connection_state)
{
default:
if (performative == NULL)
{
/* Codes_S_R_S_CONNECTION_01_223: [If the on_endpoint_frame_received is called with a NULL performative then the connection shall be closed with the error condition amqp:internal-error and an implementation defined error description.] */
close_connection_with_error(connection, "amqp:internal-error", "connection_endpoint_frame_received::NULL performative", NULL);
LogError("connection_endpoint_frame_received::NULL performative");
}
else
{
AMQP_VALUE descriptor = amqpvalue_get_inplace_descriptor(performative);
if (connection->is_trace_on == 1)
{
log_incoming_frame(performative);
}
if (is_open_type_by_descriptor(descriptor))
{
if (channel != 0)
{
/* Codes_S_R_S_CONNECTION_01_006: [The open frame can only be sent on channel 0.] */
/* Codes_S_R_S_CONNECTION_01_222: [If an Open frame is received in a manner violating the ISO specification, the connection shall be closed with condition amqp:not-allowed and description being an implementation defined string.] */
close_connection_with_error(connection, "amqp:not-allowed", "OPEN frame received on a channel that is not 0", NULL);
LogError("OPEN frame received on a channel that is not 0");
}
if (connection->connection_state == CONNECTION_STATE_OPENED)
{
/* Codes_S_R_S_CONNECTION_01_239: [If an Open frame is received in the Opened state the connection shall be closed with condition amqp:illegal-state and description being an implementation defined string.] */
close_connection_with_error(connection, "amqp:illegal-state", "OPEN frame received in the OPENED state", NULL);
LogError("OPEN frame received in the OPENED state");
}
else if ((connection->connection_state == CONNECTION_STATE_OPEN_SENT) ||
(connection->connection_state == CONNECTION_STATE_HDR_EXCH))
{
OPEN_HANDLE open_handle;
if (amqpvalue_get_open(performative, &open_handle) != 0)
{
/* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */
/* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */
close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL);
LogError("connection_endpoint_frame_received::failed parsing OPEN frame");
}
else
{
if (open_get_idle_time_out(open_handle, &connection->remote_idle_timeout) == 0)
{
/* since we obtained the remote_idle_timeout, compute at what millisecond we should send the empty frame */
connection->remote_idle_timeout_send_frame_millisecond = (milliseconds)(connection->idle_timeout_empty_frame_send_ratio * connection->remote_idle_timeout);
}
if ((open_get_max_frame_size(open_handle, &connection->remote_max_frame_size) != 0) ||
/* Codes_S_R_S_CONNECTION_01_167: [Both peers MUST accept frames of up to 512 (MIN-MAX-FRAME-SIZE) octets.] */
(connection->remote_max_frame_size < 512))
{
/* Codes_S_R_S_CONNECTION_01_143: [If any of the values in the received open frame are invalid then the connection shall be closed.] */
/* Codes_S_R_S_CONNECTION_01_220: [The error amqp:invalid-field shall be set in the error.condition field of the CLOSE frame.] */
close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing OPEN frame", NULL);
LogError("connection_endpoint_frame_received::failed parsing OPEN frame");
}
else
{
if (connection->connection_state == CONNECTION_STATE_OPEN_SENT)
{
connection_set_state(connection, CONNECTION_STATE_OPENED);
}
else
{
if (send_open_frame(connection) != 0)
{
connection_set_state(connection, CONNECTION_STATE_END);
}
else
{
connection_set_state(connection, CONNECTION_STATE_OPENED);
}
}
}
open_destroy(open_handle);
}
}
else
{
/* do nothing for now ... */
}
}
else if (is_close_type_by_descriptor(descriptor))
{
/* Codes_S_R_S_CONNECTION_01_242: [The connection module shall accept CLOSE frames even if they have extra payload bytes besides the Close performative.] */
/* Codes_S_R_S_CONNECTION_01_225: [HDR_RCVD HDR OPEN] */
if ((connection->connection_state == CONNECTION_STATE_HDR_RCVD) ||
/* Codes_S_R_S_CONNECTION_01_227: [HDR_EXCH OPEN OPEN] */
(connection->connection_state == CONNECTION_STATE_HDR_EXCH) ||
/* Codes_S_R_S_CONNECTION_01_228: [OPEN_RCVD OPEN *] */
(connection->connection_state == CONNECTION_STATE_OPEN_RCVD) ||
/* Codes_S_R_S_CONNECTION_01_235: [CLOSE_SENT - * TCP Close for Write] */
(connection->connection_state == CONNECTION_STATE_CLOSE_SENT) ||
/* Codes_S_R_S_CONNECTION_01_236: [DISCARDING - * TCP Close for Write] */
(connection->connection_state == CONNECTION_STATE_DISCARDING))
{
if (xio_close(connection->io, NULL, NULL) != 0)
{
LogError("xio_close failed");
}
}
else
{
CLOSE_HANDLE close_handle;
/* Codes_S_R_S_CONNECTION_01_012: [A close frame MAY be received on any channel up to the maximum channel number negotiated in open.] */
if (channel > connection->channel_max)
{
close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL);
LogError("connection_endpoint_frame_received::failed parsing CLOSE frame");
}
else
{
if (amqpvalue_get_close(performative, &close_handle) != 0)
{
close_connection_with_error(connection, "amqp:invalid-field", "connection_endpoint_frame_received::failed parsing CLOSE frame", NULL);
LogError("connection_endpoint_frame_received::failed parsing CLOSE frame");
}
else
{
ERROR_HANDLE error;
if (close_get_error(close_handle, &error) != 0)
{
error = NULL;
}
close_destroy(close_handle);
connection_set_state(connection, CONNECTION_STATE_CLOSE_RCVD);
if (send_close_frame(connection, NULL) != 0)
{
LogError("Cannot send CLOSE frame");
}
/* Codes_S_R_S_CONNECTION_01_214: [If the close frame cannot be constructed or sent, the connection shall be closed and set to the END state.] */
if (xio_close(connection->io, NULL, NULL) != 0)
{
LogError("xio_close failed");
}
connection_set_state(connection, CONNECTION_STATE_END);
if (connection->on_connection_close_received_event_subscription.on_connection_close_received != NULL)
{
connection->on_connection_close_received_event_subscription.on_connection_close_received(connection->on_connection_close_received_event_subscription.context, error);
}
error_destroy(error);
}
}
}
}
else
{
uint64_t performative_ulong;
if (amqpvalue_get_ulong(descriptor, &performative_ulong) != 0)
{
LogError("Failed getting ulong amqp performative");
}
else
{
switch (performative_ulong)
{
default:
LogError("Bad performative: %02x", (unsigned int)performative_ulong);
break;
case AMQP_BEGIN:
{
BEGIN_HANDLE begin;
if (amqpvalue_get_begin(performative, &begin) != 0)
{
LogError("Cannot get begin performative");
}
else
{
uint16_t remote_channel;
ENDPOINT_HANDLE new_endpoint = NULL;
bool remote_begin = false;
if (begin_get_remote_channel(begin, &remote_channel) != 0)
{
remote_begin = true;
if (connection->on_new_endpoint != NULL)
{
new_endpoint = connection_create_endpoint(connection);
if (!connection->on_new_endpoint(connection->on_new_endpoint_callback_context, new_endpoint))
{
connection_destroy_endpoint(new_endpoint);
new_endpoint = NULL;
}
}
}
if (!remote_begin)
{
ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_outgoing_channel(connection, remote_channel);
if (session_endpoint == NULL)
{
LogError("Cannot create session endpoint");
}
else
{
session_endpoint->incoming_channel = channel;
session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes);
}
}
else
{
if (new_endpoint != NULL)
{
new_endpoint->incoming_channel = channel;
new_endpoint->on_endpoint_frame_received(new_endpoint->callback_context, performative, payload_size, payload_bytes);
}
}
begin_destroy(begin);
}
break;
}
case AMQP_FLOW:
case AMQP_TRANSFER:
case AMQP_DISPOSITION:
case AMQP_END:
case AMQP_ATTACH:
case AMQP_DETACH:
{
ENDPOINT_INSTANCE* session_endpoint = find_session_endpoint_by_incoming_channel(connection, channel);
if (session_endpoint == NULL)
{
LogError("Cannot find session endpoint for channel %u", (unsigned int)channel);
}
else
{
session_endpoint->on_endpoint_frame_received(session_endpoint->callback_context, performative, payload_size, payload_bytes);
}
break;
}
}
}
}
}
break;
case CONNECTION_STATE_START:
/* Codes_S_R_S_CONNECTION_01_224: [START HDR HDR] */
case CONNECTION_STATE_HDR_SENT:
/* Codes_S_R_S_CONNECTION_01_226: [HDR_SENT OPEN HDR] */
case CONNECTION_STATE_OPEN_PIPE:
/* Codes_S_R_S_CONNECTION_01_230: [OPEN_PIPE ** HDR] */
case CONNECTION_STATE_OC_PIPE:
/* Codes_S_R_S_CONNECTION_01_232: [OC_PIPE - HDR TCP Close for Write] */
case CONNECTION_STATE_CLOSE_RCVD:
/* Codes_S_R_S_CONNECTION_01_234: [CLOSE_RCVD * - TCP Close for Read] */
case CONNECTION_STATE_END:
/* Codes_S_R_S_CONNECTION_01_237: [END - - TCP Close] */
if (xio_close(connection->io, NULL, NULL) != 0)
{
LogError("xio_close failed");
}
break;
}
}
}
}