in src/cdi/internal_rx.c [867:1020]
void RxPacketReceive(void* param_ptr, Packet* packet_ptr, EndpointMessageType message_type)
{
assert(kEndpointMessageTypePacketReceived == message_type);
(void)message_type;
CdiEndpointState* endpoint_ptr = (CdiEndpointState*)param_ptr;
CdiConnectionState* con_state_ptr = endpoint_ptr->connection_state_ptr;
bool still_ok = true;
CdiProtocolHandle protocol_handle = endpoint_ptr->adapter_endpoint_ptr->protocol_handle;
if (NULL == protocol_handle) {
CDI_LOG_THREAD(kLogError, "Connection[%s] Received packet but no protocol defined to decode it. ",
con_state_ptr->saved_connection_name_str);
// Free the buffer and return. No need to flow through all the logic below.
CdiAdapterFreeBuffer(endpoint_ptr->adapter_endpoint_ptr, &packet_ptr->sg_list);
return;
}
CdiDecodedPacketHeader decoded_header = { 0 };
ProtocolPayloadHeaderDecode(protocol_handle, packet_ptr->sg_list.sgl_head_ptr->address_ptr,
packet_ptr->sg_list.sgl_head_ptr->size_in_bytes, &decoded_header);
int payload_num = decoded_header.payload_num;
int packet_sequence_num = decoded_header.packet_sequence_num;
int cdi_header_size = decoded_header.encoded_header_size;
#ifdef DEBUG_PACKET_SEQUENCES
CdiPayloadType payload_type = decoded_header.payload_type;
CDI_LOG_THREAD(kLogInfo, "T[%d] P[%3d] S[%3d] A[%p]", payload_type, payload_num, packet_sequence_num,
packet_ptr->sg_list.sgl_head_ptr->address_ptr);
#endif
RxPayloadState* payload_state_ptr = RxReorderPayloadStateGet(endpoint_ptr,
con_state_ptr->rx_state.rx_payload_state_pool_handle,
payload_num);
if (NULL == payload_state_ptr) {
still_ok = false;
} else {
// Should never be here in the error state. The error state is only set in the logic below and then changed to
// ignore before this function exits.
assert(kPayloadError != payload_state_ptr->payload_state);
// No need to check if this is already set. If this code is being reached a first payload has been received.
con_state_ptr->rx_state.received_first_payload = true;
// If we get a packet for a completed payload, issue a warning, and then set the suspend_warnings flag so that
// we don't keep issuing warnings if we get more packets for this same payload before it is sent to the
// application.
if (!payload_state_ptr->suspend_warnings && (kPayloadComplete == payload_state_ptr->payload_state)) {
CDI_LOG_THREAD(kLogWarning, "Connection[%s] Received packet for completed payload[%d]. Additional packets "
"for this payload will be dropped.",
con_state_ptr->saved_connection_name_str, payload_num);
payload_state_ptr->suspend_warnings = true;
}
// If we have received a packet for a payload that is marked ignore, we will ignore incoming packets for it
// until we have received CDI_MAX_RX_PACKET_OUT_OF_ORDER_WINDOW packets since the payload was set to ignore.
if (kPayloadIgnore == payload_state_ptr->payload_state &&
RxReorderPayloadIsStale(endpoint_ptr, payload_state_ptr)) {
// Payload state data is stale, so ok to re-use it now.
RxReorderPayloadResetState(payload_state_ptr, payload_num);
}
// This will be true while processing of the packet proceeds normally. The packet ignore and error states are
// considered abnormal in the respect that the packet does not undergo the normal processing. Any allocated
// resources coming into the function and allocated along the way must be passed on or freed at the end.
still_ok = (kPayloadIdle == payload_state_ptr->payload_state ||
kPayloadInProgress == payload_state_ptr->payload_state ||
kPayloadPacketZeroPending == payload_state_ptr->payload_state);
}
// Check if we are receiving a new payload.
CdiMemoryState* payload_memory_state_ptr = NULL;
if (still_ok) {
if (kPayloadIdle == payload_state_ptr->payload_state) {
// Create state data for a new payload.
still_ok = InitializePayloadState(protocol_handle, endpoint_ptr, packet_ptr, payload_state_ptr,
&decoded_header, &payload_memory_state_ptr);
} else {
if (kPayloadPacketZeroPending == payload_state_ptr->payload_state &&
0 == packet_sequence_num) {
UpdatePayloadStateDataFromCDIPacket0(payload_state_ptr, &decoded_header);
}
// Using state data for an existing in progress payload.
payload_memory_state_ptr = payload_state_ptr->work_request_state.payload_memory_state_ptr;
if (kCdiSgl == con_state_ptr->rx_state.config_data.rx_buffer_type) {
// Send the Rx packet SGL to the packet re-orderer. It will determine if the entry was used or cached.
// The packet reordering logic does not need to be invoked if the connection was configured for a linear
// receive buffer.
still_ok = RxReorderPacket(protocol_handle, con_state_ptr->rx_state.payload_sgl_entry_pool_handle,
con_state_ptr->rx_state.reorder_entries_pool_handle, payload_state_ptr,
&packet_ptr->sg_list, cdi_header_size, packet_sequence_num);
}
}
}
if (still_ok && kCdiLinearBuffer == con_state_ptr->rx_state.config_data.rx_buffer_type) {
assert(NULL != payload_state_ptr->linear_buffer_ptr);
// Gather this packet into the linear receive buffer.
still_ok = CopyToLinearBuffer(con_state_ptr, packet_ptr, payload_state_ptr, &decoded_header);
}
if (!still_ok && payload_state_ptr &&
((kPayloadInProgress == payload_state_ptr->payload_state) ||
(kPayloadPacketZeroPending == payload_state_ptr->payload_state))) {
// An error occurred so set payload error.
RxReorderPayloadError(endpoint_ptr, payload_state_ptr);
}
if (still_ok && kPayloadInProgress == payload_state_ptr->payload_state &&
payload_state_ptr->data_bytes_received >= payload_state_ptr->expected_payload_data_size) {
// The entire payload has been received, so finalize it and add it to the payload reordering list in the correct
// order.
still_ok = FinalizePayload(con_state_ptr, payload_state_ptr);
payload_state_ptr->payload_state = kPayloadComplete;
if (still_ok) {
if (kCdiBackPressureNone != con_state_ptr->back_pressure_state) {
// Successfully received a payload and had back pressure. In order to prevent Rx payload reorder logic
// from waiting for a payload that may have been thrown away, advance the current window index to the
// first payload.
RxReorderPayloadSeekFirstPayload(endpoint_ptr);
con_state_ptr->back_pressure_state = kCdiBackPressureNone; // Reset back pressure state.
}
}
}
// Decide what to do with the incoming packet's SGL.
if (still_ok && kCdiSgl == con_state_ptr->rx_state.config_data.rx_buffer_type) {
// In SGL mode (SGL packet buffer is being directly used). Append the head of Rx packet SGL list to the tail
// of the endpoint buffer SGL list. This will append the entire list to the buffer SGL. This list is used
// later to free the buffers in the adapter via the application's call to CdiCoreRxFreeBuffer(), which uses
// CdiAdapterFreeBuffer().
// NOTE: The size of the endpoint SGL list is updated in SglMoveEntries().
SglMoveEntries(&payload_memory_state_ptr->endpoint_packet_buffer_sgl, &packet_ptr->sg_list);
} else {
// The SGL passed in to the function was not consumed. Send it back to the adapter now.
CdiAdapterFreeBuffer(endpoint_ptr->adapter_endpoint_ptr, &packet_ptr->sg_list);
}
if (still_ok) {
payload_state_ptr->last_total_packet_count = endpoint_ptr->rx_state.total_packet_count;
payload_state_ptr->packet_count++;
// Packet is ok (no errors), so increment Rx reorder buffered packet counter.
endpoint_ptr->rx_state.rxreorder_buffered_packet_count++;
} else if (kCdiBackPressureActive == con_state_ptr->back_pressure_state) {
QueueBackPressurePayloadToApp(con_state_ptr, endpoint_ptr, &decoded_header);
}
// Always increment total Rx packet counter (packet was actually received) and check if any payloads are ready to
// send.
endpoint_ptr->rx_state.total_packet_count++;
RxReorderPayloadSendReadyPayloads(endpoint_ptr);
}