in src/cdi/receive_buffer.c [73:199]
CDI_THREAD ReceiveBufferThread(void* ptr)
{
ReceiveBufferState* state_ptr = (ReceiveBufferState*)ptr;
// Set this thread to use the connection's log. Can now use CDI_LOG_THREAD() for logging within this thread.
CdiLoggerThreadLogSet(state_ptr->log_handle);
CdiList delay_list;
CdiListInit(&delay_list);
int missed_count = kMaxMissed; // Cause the first received payload to reset the timestamp offset.
int64_t t_offset = 0; // The difference between payload timestamps and the monotonic clock.
uint32_t timeout_ms = CDI_INFINITE;
while (!CdiOsSignalGet(state_ptr->shutdown_signal)) {
// Wait for work to do. If the queue is empty, we will wait for data or the shutdown signal.
AppPayloadCallbackData app_cb_data;
if (CdiQueuePopWait(state_ptr->input_queue_handle, timeout_ms, state_ptr->shutdown_signal,
(void**)&app_cb_data)) {
const uint64_t payload_timestamp_us =
CdiUtilityPtpTimestampToMicroseconds(&app_cb_data.core_extra_data.origination_ptp_timestamp);
const uint64_t now = TaiNowMicroseconds();
// Reset t_offset if necessary. This is done before incrementing missed_count so that the offset is set to
// the extreme end of the window even if the first payload's timestamp is close enough to "now."
if (missed_count >= kMaxMissed) {
t_offset = now - payload_timestamp_us;
missed_count = 0;
} else if (t_offset + payload_timestamp_us < now - state_ptr->buffer_delay_microseconds ||
t_offset + payload_timestamp_us > now) {
missed_count++;
} else {
missed_count = 0;
}
// Compute the send time.
const uint64_t send_time = payload_timestamp_us + state_ptr->buffer_delay_microseconds + t_offset;
// Put the payload into the output queue if it's already late.
if (send_time <= now) {
app_cb_data.receive_buffer_send_time = send_time;
CdiQueuePush(state_ptr->output_queue_handle, &app_cb_data);
} else {
// Cap send time to now + delay.
app_cb_data.receive_buffer_send_time = CDI_MIN(send_time, now + state_ptr->buffer_delay_microseconds);
// The input and output queues provide copies of the AppPayloadCallbackData structure. Get an item out
// of the pool to store the data in while it's being delayed.
AppPayloadCallbackData* pool_item_ptr = NULL;
if (!CdiPoolGet(state_ptr->delay_pool_handle, (void**)&pool_item_ptr)) {
CDI_LOG_THREAD(kLogCritical,
"Failed to get AppPayloadCallbackData from pool. Throwing away payload [%10u.%09u]",
app_cb_data.core_extra_data.origination_ptp_timestamp.seconds,
app_cb_data.core_extra_data.origination_ptp_timestamp.nanoseconds);
} else {
*pool_item_ptr = app_cb_data; // Copy the callback data into the pool item storage.
// Place the payload int the delay line with its position determined by send time.
CdiListIterator list_iterator;
CdiListIteratorInit(&delay_list, &list_iterator);
AppPayloadCallbackData* entry_ptr;
while (NULL != (entry_ptr = (AppPayloadCallbackData*)CdiListIteratorGetNext(&list_iterator))) {
const uint64_t entry_send_time = entry_ptr->receive_buffer_send_time;
if (entry_send_time > app_cb_data.receive_buffer_send_time) {
CdiListAddBefore(&delay_list, &pool_item_ptr->list_entry, &entry_ptr->list_entry);
break;
}
}
if (NULL == entry_ptr) {
CdiListAddTail(&delay_list, &pool_item_ptr->list_entry);
}
}
}
}
// Take items out of the delay line until the first one that needs to remain in it is encountered.
AppPayloadCallbackData* app_cb_data_ptr = NULL;
while (NULL != (app_cb_data_ptr = (AppPayloadCallbackData*)CdiListPeek(&delay_list))) {
// Get "now" and send time of payload at the head of the delay line.
const uint64_t now = TaiNowMicroseconds();
const uint64_t send_time = app_cb_data_ptr->receive_buffer_send_time;
// Place payload into the output queue if its send time has already passed or if send time is too far in the
// future. This will happen if host clock has been set backwards by more than the delay time after the
// payload was put in the list.
if (send_time <= now || send_time > now + state_ptr->buffer_delay_microseconds) {
if (!CdiQueuePush(state_ptr->output_queue_handle, app_cb_data_ptr)) {
PayloadErrorFreeBuffer(state_ptr->error_message_pool, app_cb_data_ptr);
}
// Free the pool storage now that its data has been copied into the queue item's storage.
CdiPoolPut(state_ptr->delay_pool_handle, app_cb_data_ptr);
// Pop the head entry out of the delay line.
CdiListPop(&delay_list);
} else {
// Since items are ordered by send time, there's no point looking any farther than the first payload
// whose send time has not yet been reached.
break;
}
}
// Figure out the maximum wait time for the next queue pop based on payload at the head of the queue.
if (NULL == app_cb_data_ptr) {
// The delay line is empty now so wait indefinitely until the next payload arrives in the input queue.
timeout_ms = CDI_INFINITE;
} else {
// Round up to the next millisecond to prevent consuming unproductive CPU cycles. If, for example, the next
// payload send time is 500 microseconds in the future, rounding down would give a zero millisecond wait
// time. Assuming no new payloads arrive in the input queue during that time, the loop will run continuously
// doing no useful work until 500 microseconds have passed. The trade off is that payloads will be delayed
// up to an extra millisecond.
const uint64_t now = TaiNowMicroseconds();
const uint64_t send_time = app_cb_data_ptr->receive_buffer_send_time;
timeout_ms = (now >= send_time) ? 0 : ((send_time - now + 999) / 1000);
}
}
// Send the entries in the delay line on to callback thread and return items to the intermediate storage pool.
void* item_ptr = NULL;
while (NULL != (item_ptr = CdiListPop(&delay_list))) {
if (!CdiQueuePush(state_ptr->output_queue_handle, item_ptr)) {
PayloadErrorFreeBuffer(state_ptr->error_message_pool, (AppPayloadCallbackData*)item_ptr);
}
CdiPoolPut(state_ptr->delay_pool_handle, item_ptr);
}
return 0; // Return value is not used for anything.
}