in src/cdi/internal_tx.c [137:370]
static CDI_THREAD TxPayloadThread(void* ptr)
{
CdiConnectionState* con_state_ptr = (CdiConnectionState*)ptr;
// Get a state tracker object for the packetizer.
CdiPacketizerStateHandle packetizer_state_handle = PayloadPacketizerCreate();
if (NULL == packetizer_state_handle) {
CDI_LOG_THREAD(kLogError, "Failed to create packetizer state.");
return 0;
}
// Set this thread to use the connection's log. Can now use CDI_LOG_THREAD() for logging within this thread.
CdiLoggerThreadLogSet(con_state_ptr->log_handle);
EndpointManagerHandle mgr_handle = con_state_ptr->endpoint_manager_handle;
// Register this thread with the Endpoint Manager as being part of this connection.
CdiSignalType notification_signal = EndpointManagerThreadRegister(mgr_handle,
CdiOsThreadGetName(con_state_ptr->payload_thread_id));
CdiSignalType comp_queue_signal = CdiQueueGetPopWaitSignal(con_state_ptr->tx_state.work_req_comp_queue_handle);
CdiSignalType signal_array[2];
signal_array[0] = notification_signal;
signal_array[1] = comp_queue_signal;
// Packets are sent to the endpoint in batches starting with a single packet. The number is doubled with each
// batch. This gives a quick start but as the queue backs up, the larger batch sizes lead to higher efficiency
// per batch.
int batch_size = 1;
// These variable are used only within the scope of the while loop below but they must be declared outside of it
// since their values need to start initialized but not get reinitialized every time through.
bool last_packet = false;
TxPacketWorkRequest* work_request_ptr = NULL;
CdiSinglyLinkedList packet_list;
CdiSinglyLinkedListInit(&packet_list);
// The state machine goes through the states like:
//
// +-----> idle -+
// | |
// | +-------+
// | |
// | +-> work received ->+
// | |
// | +-------------------+
// | |
// | +->+-> get work request ->+
// | | |
// | | +-------------------+
// | | |
// | | +-> packetizing ->+
// | | |
// | +<----------------------+ <-- list of packets to enqueue is incomplete
// | ^ |
// | | +--------------------+ <-- list of packets to enqueue is complete
// | | |
// | | +-> enqueueing ->+
// | | |
// | +-------------------+ <-- not last packet of payload
// | |
// +----------------------+ <-- last packet of the payload has been successfully queued
enum {
kPayloadStateIdle, // No payload is in process: wait for payload from queue.
kPayloadStateWorkReceived, // A payload was received to be transmitted: initialize for first packet.
kPayloadStateGetWorkRequest, // Payload and packetizer initialized: get a work request from pool.
kPayloadStatePacketizing, // Have work request: build SGL.
kPayloadStateEnqueuing // Have completed list of work requests: queued to the adapter.
} payload_processing_state = kPayloadStateIdle;
// This loop should only block at the call to CdiQueuePopWaitMultiple(). If a pool runs dry or the output queue is
// full, the logic inside of the loop should maintain enough state to suspend the process of packetizing the current
// payload and resume when resources are available.
TxPayloadState* payload_state_ptr = NULL;
while (!CdiOsSignalGet(con_state_ptr->shutdown_signal) && !EndpointManagerIsConnectionShuttingDown(mgr_handle)) {
uint32_t signal_index = 0;
bool payload_received = false;
if (kPayloadStateIdle == payload_processing_state) {
// Wait for work from the payload queue, the work request complete queue, or a signal from the endpoint
// manager.
payload_received = CdiQueuePopWaitMultiple(con_state_ptr->tx_state.payload_queue_handle, CDI_INFINITE,
signal_array, 2, &signal_index, (void**)&payload_state_ptr);
} else {
// A payload is currently in process. Wait for completion requests or a signal from the Endpoint Manager.
CdiOsSignalsWait(signal_array, 2, false, CDI_INFINITE, &signal_index);
}
if (!payload_received) {
// Either processing an existing payload or did not get a new one. Got a signal from either the Endpoint
// Manager or work_req_comp_queue_handle (the queue contains data).
if (0 == signal_index) {
// Got a notification_signal. The endpoint state has changed, so wait until it has completed.
EndpointManagerThreadWait(mgr_handle);
// An Endpoint Manager state change means that Tx resources have been flushed or queued to be flushed,
// including the current Tx payload that we could be processing. Reset our current payload state back to
// idle. Allow the logic to drop below so if needed ProcessWorkRequestCompletionQueue() is invoked.
payload_processing_state = kPayloadStateIdle;
payload_state_ptr = NULL;
}
} else {
payload_processing_state = kPayloadStateWorkReceived;
// Increment reference counter once at the start of each payload. This will keep the PollThread() working as
// long as we have payloads and their related packets to send.
CdiOsAtomicInc32(&payload_state_ptr->cdi_endpoint_handle->adapter_endpoint_ptr->tx_in_flight_ref_count);
CdiOsSignalSet(con_state_ptr->adapter_connection_ptr->tx_poll_do_work_signal);
}
// Always check the completion queue here. Don't want to starve it in case either several Endpoint Manager
// notifications are received or the payload_queue_handle doesn't go empty.
if (CdiOsSignalReadState(comp_queue_signal)) {
ProcessWorkRequestCompletionQueue(con_state_ptr);
}
// Either resume work on a payload in progress or start a new one.
if (kPayloadStateWorkReceived == payload_processing_state) {
// No packet was in progress so start by initializing for the first one.
// Increment payload number. NOTE: This is done here on the read side of the queue rather than on the write
// side of the queue because the write side fails if the queue is full. This would cause payload_num to
// increment erroneously. By incrementing here on the read side, this problem is avoided.
payload_state_ptr->payload_packet_state.payload_num =
GetNextPayloadNum(payload_state_ptr->cdi_endpoint_handle);
if (CdiLogComponentIsEnabled(con_state_ptr, kLogComponentPayloadConfig)) {
// Dump payload configuration to log or stdout.
DumpPayloadConfiguration(&payload_state_ptr->app_payload_cb_data.core_extra_data,
payload_state_ptr->app_payload_cb_data.extra_data_size,
payload_state_ptr->app_payload_cb_data.extra_data_array,
con_state_ptr->protocol_type);
}
// Prepare packetizer for first packet.
PayloadPacketizerStateInit(packetizer_state_handle);
CdiSinglyLinkedListInit(&packet_list);
batch_size = 1;
last_packet = false;
payload_processing_state = kPayloadStateGetWorkRequest; // Advance the state machine.
}
bool keep_going = kPayloadStateGetWorkRequest == payload_processing_state ||
kPayloadStatePacketizing == payload_processing_state ||
kPayloadStateEnqueuing == payload_processing_state;
while (keep_going) {
// When the connection goes down, no need to use resources to continue creating packets or adding them to
// the adapter's queue. If the adapter's queue gets full it will start generating queue full log message
// errors.
AdapterEndpointHandle adapter_endpoint_handle =
EndpointManagerEndpointToAdapterEndpoint(payload_state_ptr->cdi_endpoint_handle);
if (kCdiConnectionStatusConnected != adapter_endpoint_handle->connection_status_code) {
break;
}
if (kPayloadStateGetWorkRequest == payload_processing_state) {
// NOTE: This pool is not thread-safe, so must ensure that only one thread is accessing it at a time.
if (!CdiPoolGet(con_state_ptr->tx_state.work_request_pool_handle, (void**)&work_request_ptr)) {
keep_going = false;
} else {
payload_processing_state = kPayloadStatePacketizing;
}
}
if (kPayloadStatePacketizing == payload_processing_state) {
// NOTE: These pools are not thread-safe, so must ensure that only one thread is accessing them at a
// time.
if (!PayloadPacketizerPacketGet(adapter_endpoint_handle->protocol_handle,
packetizer_state_handle, (char*)&work_request_ptr->header,
con_state_ptr->tx_state.packet_sgl_entry_pool_handle,
payload_state_ptr, &work_request_ptr->packet.sg_list, &last_packet))
{
// Pool is empty; suspend processing the payload for now, retry after resources are freed.
keep_going = false;
} else {
#ifdef DEBUG_TX_PACKET_SGL_ENTRIES
DebugTxPacketSglEntries(work_request_ptr);
#endif
// Fill in the work request with the specifics of the packet.
work_request_ptr->payload_state_ptr = payload_state_ptr;
work_request_ptr->payload_num = payload_state_ptr->payload_packet_state.payload_num;
work_request_ptr->packet_payload_size =
payload_state_ptr->payload_packet_state.packet_payload_data_size;
// This pointer will be used later by TxPacketWorkRequestComplete() to get access to
// work_request_ptr (a pointer to a TxPacketWorkRequest structure).
work_request_ptr->packet.sg_list.internal_data_ptr = work_request_ptr;
// Set flag for last packet of the payload so ACKs received can keep track of the number of
// in-flight payloads.
work_request_ptr->packet.payload_last_packet = last_packet;
// Add the packet to a list to be enqueued to the adapter.
CdiSinglyLinkedListPushTail(&packet_list, &work_request_ptr->packet.list_entry);
// Increment reference counter once for each packet.
CdiOsAtomicInc32(&payload_state_ptr->cdi_endpoint_handle->adapter_endpoint_ptr->tx_in_flight_ref_count);
payload_processing_state = (last_packet || CdiSinglyLinkedListSize(&packet_list) >= batch_size) ?
kPayloadStateEnqueuing : kPayloadStateGetWorkRequest;
}
}
if (kPayloadStateEnqueuing == payload_processing_state) {
// Enqueue packets. packet_list is copied so it can simply be initialized here to start fresh.
if (kCdiStatusOk != CdiAdapterEnqueueSendPackets(
EndpointManagerEndpointToAdapterEndpoint(payload_state_ptr->cdi_endpoint_handle),
&packet_list)) {
keep_going = false;
} else {
CdiSinglyLinkedListInit(&packet_list);
batch_size *= 2;
if (last_packet) {
// The last packet of the payload has been sent; reset to start a new one.
payload_processing_state = kPayloadStateIdle;
payload_state_ptr = NULL;
keep_going = false;
// Successfully put all packets for a payload into Tx queue, so reset the back pressure state.
con_state_ptr->back_pressure_state = kCdiBackPressureNone;
} else {
payload_processing_state = kPayloadStateGetWorkRequest;
}
}
}
}
}
PayloadPacketizerDestroy(packetizer_state_handle);
if (EndpointManagerIsConnectionShuttingDown(mgr_handle)) {
// Since this thread was registered with the Endpoint Manager using EndpointManagerThreadRegister(), need to
// wait for the Endpoint Manager to complete the shutdown.
EndpointManagerThreadWait(mgr_handle);
}
return 0; // Return code not used.
}