CDI_THREAD ReceiveBufferThread()

in src/cdi/receive_buffer.c [73:199]


CDI_THREAD ReceiveBufferThread(void* ptr)
{
    ReceiveBufferState* state_ptr = (ReceiveBufferState*)ptr;

    // Set this thread to use the connection's log. Can now use CDI_LOG_THREAD() for logging within this thread.
    CdiLoggerThreadLogSet(state_ptr->log_handle);

    CdiList delay_list;
    CdiListInit(&delay_list);

    int missed_count = kMaxMissed;  // Cause the first received payload to reset the timestamp offset.
    int64_t t_offset = 0;  // The difference between payload timestamps and the monotonic clock.
    uint32_t timeout_ms = CDI_INFINITE;

    while (!CdiOsSignalGet(state_ptr->shutdown_signal)) {
        // Wait for work to do. If the queue is empty, we will wait for data or the shutdown signal.
        AppPayloadCallbackData app_cb_data;
        if (CdiQueuePopWait(state_ptr->input_queue_handle, timeout_ms, state_ptr->shutdown_signal,
                            (void**)&app_cb_data)) {
            const uint64_t payload_timestamp_us =
                CdiUtilityPtpTimestampToMicroseconds(&app_cb_data.core_extra_data.origination_ptp_timestamp);
            const uint64_t now = TaiNowMicroseconds();

            // Reset t_offset if necessary. This is done before incrementing missed_count so that the offset is set to
            // the extreme end of the window even if the first payload's timestamp is close enough to "now."
            if (missed_count >= kMaxMissed) {
                t_offset = now - payload_timestamp_us;
                missed_count = 0;
           } else if (t_offset + payload_timestamp_us < now - state_ptr->buffer_delay_microseconds ||
                t_offset + payload_timestamp_us > now) {
                missed_count++;
            } else {
                missed_count = 0;
            }

            // Compute the send time.
            const uint64_t send_time = payload_timestamp_us + state_ptr->buffer_delay_microseconds + t_offset;

            // Put the payload into the output queue if it's already late.
            if (send_time <= now) {
                app_cb_data.receive_buffer_send_time = send_time;
                CdiQueuePush(state_ptr->output_queue_handle, &app_cb_data);
            } else {
                // Cap send time to now + delay.
                app_cb_data.receive_buffer_send_time = CDI_MIN(send_time, now + state_ptr->buffer_delay_microseconds);

                // The input and output queues provide copies of the AppPayloadCallbackData structure. Get an item out
                // of the pool to store the data in while it's being delayed.
                AppPayloadCallbackData* pool_item_ptr = NULL;
                if (!CdiPoolGet(state_ptr->delay_pool_handle, (void**)&pool_item_ptr)) {
                    CDI_LOG_THREAD(kLogCritical,
                                   "Failed to get AppPayloadCallbackData from pool. Throwing away payload [%10u.%09u]",
                                   app_cb_data.core_extra_data.origination_ptp_timestamp.seconds,
                                   app_cb_data.core_extra_data.origination_ptp_timestamp.nanoseconds);
                } else {
                    *pool_item_ptr = app_cb_data;  // Copy the callback data into the pool item storage.

                    // Place the payload int the delay line with its position determined by send time.
                    CdiListIterator list_iterator;
                    CdiListIteratorInit(&delay_list, &list_iterator);
                    AppPayloadCallbackData* entry_ptr;
                    while (NULL != (entry_ptr = (AppPayloadCallbackData*)CdiListIteratorGetNext(&list_iterator))) {
                        const uint64_t entry_send_time = entry_ptr->receive_buffer_send_time;
                        if (entry_send_time > app_cb_data.receive_buffer_send_time) {
                            CdiListAddBefore(&delay_list, &pool_item_ptr->list_entry, &entry_ptr->list_entry);
                            break;
                        }
                    }
                    if (NULL == entry_ptr) {
                        CdiListAddTail(&delay_list, &pool_item_ptr->list_entry);
                    }
                }
            }
        }

        // Take items out of the delay line until the first one that needs to remain in it is encountered.
        AppPayloadCallbackData* app_cb_data_ptr = NULL;
        while (NULL != (app_cb_data_ptr = (AppPayloadCallbackData*)CdiListPeek(&delay_list))) {
            // Get "now" and send time of payload at the head of the delay line.
            const uint64_t now = TaiNowMicroseconds();
            const uint64_t send_time = app_cb_data_ptr->receive_buffer_send_time;

            // Place payload into the output queue if its send time has already passed or if send time is too far in the
            // future. This will happen if host clock has been set backwards by more than the delay time after the
            // payload was put in the list.
            if (send_time <= now || send_time > now + state_ptr->buffer_delay_microseconds) {
                if (!CdiQueuePush(state_ptr->output_queue_handle, app_cb_data_ptr)) {
                    PayloadErrorFreeBuffer(state_ptr->error_message_pool, app_cb_data_ptr);
                }
                // Free the pool storage now that its data has been copied into the queue item's storage.
                CdiPoolPut(state_ptr->delay_pool_handle, app_cb_data_ptr);
                // Pop the head entry out of the delay line.
                CdiListPop(&delay_list);
            } else {
                // Since items are ordered by send time, there's no point looking any farther than the first payload
                // whose send time has not yet been reached.
                break;
            }
        }

        // Figure out the maximum wait time for the next queue pop based on payload at the head of the queue.
        if (NULL == app_cb_data_ptr) {
            // The delay line is empty now so wait indefinitely until the next payload arrives in the input queue.
            timeout_ms = CDI_INFINITE;
        } else {
            // Round up to the next millisecond to prevent consuming unproductive CPU cycles. If, for example, the next
            // payload send time is 500 microseconds in the future, rounding down would give a zero millisecond wait
            // time. Assuming no new payloads arrive in the input queue during that time, the loop will run continuously
            // doing no useful work until 500 microseconds have passed. The trade off is that payloads will be delayed
            // up to an extra millisecond.
            const uint64_t now = TaiNowMicroseconds();
            const uint64_t send_time = app_cb_data_ptr->receive_buffer_send_time;
            timeout_ms = (now >= send_time) ? 0 : ((send_time - now + 999) / 1000);
        }
    }

    // Send the entries in the delay line on to callback thread and return items to the intermediate storage pool.
    void* item_ptr = NULL;
    while (NULL != (item_ptr = CdiListPop(&delay_list))) {
        if (!CdiQueuePush(state_ptr->output_queue_handle, item_ptr)) {
            PayloadErrorFreeBuffer(state_ptr->error_message_pool, (AppPayloadCallbackData*)item_ptr);
        }
        CdiPoolPut(state_ptr->delay_pool_handle, item_ptr);
    }

    return 0;  // Return value is not used for anything.
}