static CDI_THREAD TxPayloadThread()

in src/cdi/internal_tx.c [137:370]


static CDI_THREAD TxPayloadThread(void* ptr)
{
    CdiConnectionState* con_state_ptr = (CdiConnectionState*)ptr;

    // Get a state tracker object for the packetizer.
    CdiPacketizerStateHandle packetizer_state_handle = PayloadPacketizerCreate();
    if (NULL == packetizer_state_handle) {
        CDI_LOG_THREAD(kLogError, "Failed to create packetizer state.");
        return 0;
    }

    // Set this thread to use the connection's log. Can now use CDI_LOG_THREAD() for logging within this thread.
    CdiLoggerThreadLogSet(con_state_ptr->log_handle);

    EndpointManagerHandle mgr_handle = con_state_ptr->endpoint_manager_handle;

    // Register this thread with the Endpoint Manager as being part of this connection.
    CdiSignalType notification_signal = EndpointManagerThreadRegister(mgr_handle,
                                            CdiOsThreadGetName(con_state_ptr->payload_thread_id));

    CdiSignalType comp_queue_signal = CdiQueueGetPopWaitSignal(con_state_ptr->tx_state.work_req_comp_queue_handle);

    CdiSignalType signal_array[2];
    signal_array[0] = notification_signal;
    signal_array[1] = comp_queue_signal;

    // Packets are sent to the endpoint in batches starting with a single packet. The number is doubled with each
    // batch. This gives a quick start but as the queue backs up, the larger batch sizes lead to higher efficiency
    // per batch.
    int batch_size = 1;

    // These variable are used only within the scope of the while loop below but they must be declared outside of it
    // since their values need to start initialized but not get reinitialized every time through.
    bool last_packet = false;
    TxPacketWorkRequest* work_request_ptr = NULL;
    CdiSinglyLinkedList packet_list;
    CdiSinglyLinkedListInit(&packet_list);

    // The state machine goes through the states like:
    //
    //   +-----> idle -+
    //   |             |
    //   |     +-------+
    //   |     |
    //   |     +-> work received ->+
    //   |                         |
    //   |     +-------------------+
    //   |     |
    //   |  +->+-> get work request ->+
    //   |  |                         |
    //   |  |     +-------------------+
    //   |  |     |
    //   |  |     +-> packetizing ->+
    //   |  |                       |
    //   |  +<----------------------+  <-- list of packets to enqueue is incomplete
    //   |  ^                       |
    //   |  |  +--------------------+  <-- list of packets to enqueue is complete
    //   |  |  |
    //   |  |  +-> enqueueing ->+
    //   |  |                   |
    //   |  +-------------------+  <-- not last packet of payload
    //   |                      |
    //   +----------------------+  <-- last packet of the payload has been successfully queued
    enum {
        kPayloadStateIdle,           // No payload is in process: wait for payload from queue.
        kPayloadStateWorkReceived,   // A payload was received to be transmitted: initialize for first packet.
        kPayloadStateGetWorkRequest, // Payload and packetizer initialized: get a work request from pool.
        kPayloadStatePacketizing,    // Have work request: build SGL.
        kPayloadStateEnqueuing       // Have completed list of work requests: queued to the adapter.
    } payload_processing_state = kPayloadStateIdle;

    // This loop should only block at the call to CdiQueuePopWaitMultiple(). If a pool runs dry or the output queue is
    // full, the logic inside of the loop should maintain enough state to suspend the process of packetizing the current
    // payload and resume when resources are available.
    TxPayloadState* payload_state_ptr = NULL;
    while (!CdiOsSignalGet(con_state_ptr->shutdown_signal) && !EndpointManagerIsConnectionShuttingDown(mgr_handle)) {
        uint32_t signal_index = 0;
        bool payload_received = false;
        if (kPayloadStateIdle == payload_processing_state) {
            // Wait for work from the payload queue, the work request complete queue, or a signal from the endpoint
            // manager.
            payload_received = CdiQueuePopWaitMultiple(con_state_ptr->tx_state.payload_queue_handle, CDI_INFINITE,
                                                       signal_array, 2, &signal_index, (void**)&payload_state_ptr);
        } else {
            // A payload is currently in process. Wait for completion requests or a signal from the Endpoint Manager.
            CdiOsSignalsWait(signal_array, 2, false, CDI_INFINITE, &signal_index);
        }
        if (!payload_received) {
            // Either processing an existing payload or did not get a new one. Got a signal from either the Endpoint
            // Manager or work_req_comp_queue_handle (the queue contains data).
            if (0 == signal_index) {
                // Got a notification_signal. The endpoint state has changed, so wait until it has completed.
                EndpointManagerThreadWait(mgr_handle);
                // An Endpoint Manager state change means that Tx resources have been flushed or queued to be flushed,
                // including the current Tx payload that we could be processing. Reset our current payload state back to
                // idle. Allow the logic to drop below so if needed ProcessWorkRequestCompletionQueue() is invoked.
                payload_processing_state = kPayloadStateIdle;
                payload_state_ptr = NULL;
            }
        } else {
            payload_processing_state = kPayloadStateWorkReceived;
            // Increment reference counter once at the start of each payload. This will keep the PollThread() working as
            // long as we have payloads and their related packets to send.
            CdiOsAtomicInc32(&payload_state_ptr->cdi_endpoint_handle->adapter_endpoint_ptr->tx_in_flight_ref_count);
            CdiOsSignalSet(con_state_ptr->adapter_connection_ptr->tx_poll_do_work_signal);
        }

        // Always check the completion queue here. Don't want to starve it in case either several Endpoint Manager
        // notifications are received or the payload_queue_handle doesn't go empty.
        if (CdiOsSignalReadState(comp_queue_signal)) {
            ProcessWorkRequestCompletionQueue(con_state_ptr);
        }

        // Either resume work on a payload in progress or start a new one.
        if (kPayloadStateWorkReceived == payload_processing_state) {
            // No packet was in progress so start by initializing for the first one.

            // Increment payload number. NOTE: This is done here on the read side of the queue rather than on the write
            // side of the queue because the write side fails if the queue is full. This would cause payload_num to
            // increment erroneously. By incrementing here on the read side, this problem is avoided.
            payload_state_ptr->payload_packet_state.payload_num =
                GetNextPayloadNum(payload_state_ptr->cdi_endpoint_handle);

            if (CdiLogComponentIsEnabled(con_state_ptr, kLogComponentPayloadConfig)) {
                // Dump payload configuration to log or stdout.
                DumpPayloadConfiguration(&payload_state_ptr->app_payload_cb_data.core_extra_data,
                                         payload_state_ptr->app_payload_cb_data.extra_data_size,
                                         payload_state_ptr->app_payload_cb_data.extra_data_array,
                                         con_state_ptr->protocol_type);
            }

            // Prepare packetizer for first packet.
            PayloadPacketizerStateInit(packetizer_state_handle);

            CdiSinglyLinkedListInit(&packet_list);
            batch_size = 1;
            last_packet = false;

            payload_processing_state = kPayloadStateGetWorkRequest;  // Advance the state machine.
        }

        bool keep_going = kPayloadStateGetWorkRequest == payload_processing_state ||
                          kPayloadStatePacketizing == payload_processing_state ||
                          kPayloadStateEnqueuing == payload_processing_state;
        while (keep_going) {
            // When the connection goes down, no need to use resources to continue creating packets or adding them to
            // the adapter's queue. If the adapter's queue gets full it will start generating queue full log message
            // errors.
            AdapterEndpointHandle adapter_endpoint_handle =
                EndpointManagerEndpointToAdapterEndpoint(payload_state_ptr->cdi_endpoint_handle);
            if (kCdiConnectionStatusConnected != adapter_endpoint_handle->connection_status_code) {
                break;
            }
            if (kPayloadStateGetWorkRequest == payload_processing_state) {
                // NOTE: This pool is not thread-safe, so must ensure that only one thread is accessing it at a time.
                if (!CdiPoolGet(con_state_ptr->tx_state.work_request_pool_handle, (void**)&work_request_ptr)) {
                    keep_going = false;
                } else {
                    payload_processing_state = kPayloadStatePacketizing;
                }
            }

            if (kPayloadStatePacketizing == payload_processing_state) {
                // NOTE: These pools are not thread-safe, so must ensure that only one thread is accessing them at a
                // time.
                if (!PayloadPacketizerPacketGet(adapter_endpoint_handle->protocol_handle,
                                                packetizer_state_handle, (char*)&work_request_ptr->header,
                                                con_state_ptr->tx_state.packet_sgl_entry_pool_handle,
                                                payload_state_ptr, &work_request_ptr->packet.sg_list, &last_packet))
                {
                    // Pool is empty; suspend processing the payload for now, retry after resources are freed.
                    keep_going = false;
                } else {
#ifdef DEBUG_TX_PACKET_SGL_ENTRIES
                    DebugTxPacketSglEntries(work_request_ptr);
#endif
                    // Fill in the work request with the specifics of the packet.
                    work_request_ptr->payload_state_ptr = payload_state_ptr;
                    work_request_ptr->payload_num = payload_state_ptr->payload_packet_state.payload_num;
                    work_request_ptr->packet_payload_size =
                        payload_state_ptr->payload_packet_state.packet_payload_data_size;

                    // This pointer will be used later by TxPacketWorkRequestComplete() to get access to
                    // work_request_ptr (a pointer to a TxPacketWorkRequest structure).
                    work_request_ptr->packet.sg_list.internal_data_ptr = work_request_ptr;

                    // Set flag for last packet of the payload so ACKs received can keep track of the number of
                    // in-flight payloads.
                    work_request_ptr->packet.payload_last_packet = last_packet;

                    // Add the packet to a list to be enqueued to the adapter.
                    CdiSinglyLinkedListPushTail(&packet_list, &work_request_ptr->packet.list_entry);
                    // Increment reference counter once for each packet.
                    CdiOsAtomicInc32(&payload_state_ptr->cdi_endpoint_handle->adapter_endpoint_ptr->tx_in_flight_ref_count);

                    payload_processing_state = (last_packet || CdiSinglyLinkedListSize(&packet_list) >= batch_size) ?
                        kPayloadStateEnqueuing : kPayloadStateGetWorkRequest;
                }
            }

            if (kPayloadStateEnqueuing == payload_processing_state) {
                // Enqueue packets. packet_list is copied so it can simply be initialized here to start fresh.
                if (kCdiStatusOk != CdiAdapterEnqueueSendPackets(
                        EndpointManagerEndpointToAdapterEndpoint(payload_state_ptr->cdi_endpoint_handle),
                        &packet_list)) {
                    keep_going = false;
                } else {
                    CdiSinglyLinkedListInit(&packet_list);
                    batch_size *= 2;

                    if (last_packet) {
                        // The last packet of the payload has been sent; reset to start a new one.
                        payload_processing_state = kPayloadStateIdle;
                        payload_state_ptr = NULL;
                        keep_going = false;
                        // Successfully put all packets for a payload into Tx queue, so reset the back pressure state.
                        con_state_ptr->back_pressure_state = kCdiBackPressureNone;
                    } else {
                        payload_processing_state = kPayloadStateGetWorkRequest;
                    }
                }
            }
        }
    }

    PayloadPacketizerDestroy(packetizer_state_handle);
    if (EndpointManagerIsConnectionShuttingDown(mgr_handle)) {
        // Since this thread was registered with the Endpoint Manager using EndpointManagerThreadRegister(), need to
        // wait for the Endpoint Manager to complete the shutdown.
        EndpointManagerThreadWait(mgr_handle);
    }

    return 0; // Return code not used.
}