in src/session.c [1447:1695]
SESSION_SEND_TRANSFER_RESULT session_send_transfer(LINK_ENDPOINT_HANDLE link_endpoint, TRANSFER_HANDLE transfer, PAYLOAD* payloads, size_t payload_count, delivery_number* delivery_id, ON_SEND_COMPLETE on_send_complete, void* callback_context)
{
SESSION_SEND_TRANSFER_RESULT result;
/* Codes_S_R_S_SESSION_01_054: [If link_endpoint or transfer is NULL, session_send_transfer shall fail and return a non-zero value.] */
if ((link_endpoint == NULL) ||
(transfer == NULL))
{
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
LINK_ENDPOINT_INSTANCE* link_endpoint_instance = (LINK_ENDPOINT_INSTANCE*)link_endpoint;
SESSION_INSTANCE* session_instance = (SESSION_INSTANCE*)link_endpoint_instance->session;
/* Codes_S_R_S_SESSION_01_059: [When session_send_transfer is called while the session is not in the MAPPED state, session_send_transfer shall fail and return a non-zero value.] */
if (session_instance->session_state != SESSION_STATE_MAPPED)
{
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
size_t payload_size = 0;
size_t i;
for (i = 0; i < payload_count; i++)
{
if ((payloads[i].length > UINT32_MAX) ||
(payload_size + payloads[i].length < payload_size))
{
break;
}
payload_size += payloads[i].length;
}
if ((i < payload_count) ||
(payload_size > UINT32_MAX))
{
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
if (session_instance->remote_incoming_window == 0)
{
result = SESSION_SEND_TRANSFER_BUSY;
}
else
{
/* Codes_S_R_S_SESSION_01_012: [The session endpoint assigns each outgoing transfer frame an implicit transfer-id from a session scoped sequence.] */
/* Codes_S_R_S_SESSION_01_027: [sending a transfer Upon sending a transfer, the sending endpoint will increment its next-outgoing-id] */
*delivery_id = session_instance->next_outgoing_id;
if ((transfer_set_handle(transfer, link_endpoint_instance->output_handle) != 0) ||
(transfer_set_delivery_id(transfer, *delivery_id) != 0) ||
(transfer_set_more(transfer, false) != 0))
{
/* Codes_S_R_S_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
AMQP_VALUE transfer_value;
transfer_value = amqpvalue_create_transfer(transfer);
if (transfer_value == NULL)
{
/* Codes_S_R_S_SESSION_01_058: [When any other error occurs, session_send_transfer shall fail and return a non-zero value.] */
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
uint32_t available_frame_size;
size_t encoded_size;
if ((connection_get_remote_max_frame_size(session_instance->connection, &available_frame_size) != 0) ||
(amqpvalue_get_encoded_size(transfer_value, &encoded_size) != 0))
{
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
payload_size = 0;
for (i = 0; i < payload_count; i++)
{
payload_size += payloads[i].length;
}
available_frame_size -= (uint32_t)encoded_size;
available_frame_size -= 8;
if (available_frame_size >= payload_size)
{
/* Codes_S_R_S_SESSION_01_055: [The encoding of the frame shall be done by calling connection_encode_frame and passing as arguments: the connection handle associated with the session, the transfer performative and the payload chunks passed to session_send_transfer.] */
if (connection_encode_frame(session_instance->endpoint, transfer_value, payloads, payload_count, on_send_complete, callback_context) != 0)
{
/* Codes_S_R_S_SESSION_01_056: [If connection_encode_frame fails then session_send_transfer shall fail and return a non-zero value.] */
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
/* Codes_S_R_S_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
session_instance->next_outgoing_id++;
session_instance->remote_incoming_window--;
session_instance->outgoing_window--;
/* Codes_S_R_S_SESSION_01_053: [On success, session_send_transfer shall return 0.] */
result = SESSION_SEND_TRANSFER_OK;
}
}
else
{
size_t current_payload_index = 0;
uint32_t current_payload_pos = 0;
/* break it down into different deliveries */
while (payload_size > 0)
{
uint32_t transfer_frame_payload_count = 0;
uint32_t current_transfer_frame_payload_size = (uint32_t)payload_size;
uint32_t byte_counter;
size_t temp_current_payload_index = current_payload_index;
uint32_t temp_current_payload_pos = current_payload_pos;
AMQP_VALUE multi_transfer_amqp_value;
PAYLOAD* transfer_frame_payloads;
bool more;
if (current_transfer_frame_payload_size > available_frame_size)
{
current_transfer_frame_payload_size = available_frame_size;
}
if (available_frame_size >= payload_size)
{
more = false;
}
else
{
more = true;
}
if (transfer_set_more(transfer, more) != 0)
{
break;
}
multi_transfer_amqp_value = amqpvalue_create_transfer(transfer);
if (multi_transfer_amqp_value == NULL)
{
break;
}
byte_counter = current_transfer_frame_payload_size;
while (byte_counter > 0)
{
if (payloads[temp_current_payload_index].length - temp_current_payload_pos >= byte_counter)
{
/* more data than we need */
temp_current_payload_pos += byte_counter;
byte_counter = 0;
}
else
{
byte_counter -= (uint32_t)payloads[temp_current_payload_index].length - temp_current_payload_pos;
temp_current_payload_index++;
temp_current_payload_pos = 0;
}
}
//transfer_frame_payload_len = (uint32_t)(temp_current_payload_index - current_payload_index + 1); // use safe int
size_t payload_len = safe_subtract_size_t(temp_current_payload_index, current_payload_index);
payload_len = safe_add_size_t(payload_len, 1);
uint32_t transfer_frame_payload_len = payload_len < UINT32_MAX ? (uint32_t)payload_len : UINT32_MAX;
if (transfer_frame_payload_len == UINT32_MAX ||
(transfer_frame_payloads = (PAYLOAD*)calloc(1, (transfer_frame_payload_len * sizeof(PAYLOAD)))) == NULL)
{
amqpvalue_destroy(multi_transfer_amqp_value);
break;
}
/* copy data */
byte_counter = current_transfer_frame_payload_size;
transfer_frame_payload_count = 0;
while (byte_counter > 0 && transfer_frame_payload_count < transfer_frame_payload_len)
{
if (payloads[current_payload_index].length - current_payload_pos > byte_counter)
{
/* more data than we need */
transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
transfer_frame_payloads[transfer_frame_payload_count].length = byte_counter;
current_payload_pos += byte_counter;
byte_counter = 0;
}
else
{
/* copy entire payload and move to the next */
transfer_frame_payloads[transfer_frame_payload_count].bytes = payloads[current_payload_index].bytes + current_payload_pos;
transfer_frame_payloads[transfer_frame_payload_count].length = payloads[current_payload_index].length - current_payload_pos;
byte_counter -= (uint32_t)payloads[current_payload_index].length - current_payload_pos;
current_payload_index++;
current_payload_pos = 0;
}
transfer_frame_payload_count++;
}
if (connection_encode_frame(session_instance->endpoint, multi_transfer_amqp_value, transfer_frame_payloads, transfer_frame_payload_count,
/* only fire the send complete calllback on the last frame of the multi frame transfer */
more ? NULL : on_send_complete,
callback_context) != 0)
{
free(transfer_frame_payloads);
amqpvalue_destroy(multi_transfer_amqp_value);
break;
}
free(transfer_frame_payloads);
amqpvalue_destroy(multi_transfer_amqp_value);
payload_size -= current_transfer_frame_payload_size;
}
if (payload_size > 0)
{
result = SESSION_SEND_TRANSFER_ERROR;
}
else
{
/* Codes_S_R_S_SESSION_01_018: [is incremented after each successive transfer according to RFC-1982 [RFC1982] serial number arithmetic.] */
session_instance->next_outgoing_id++;
session_instance->remote_incoming_window--;
session_instance->outgoing_window--;
result = SESSION_SEND_TRANSFER_OK;
}
}
}
amqpvalue_destroy(transfer_value);
}
}
}
}
}
}
return result;
}