in src/client/src/Stream.c [2442:2556]
STATUS streamFragmentPersistedAck(PKinesisVideoStream pKinesisVideoStream, UINT64 timestamp, PUploadHandleInfo pUploadHandleInfo)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS, setViewStatus = STATUS_SUCCESS;
PViewItem pCurItem;
UINT64 curItemIndex = 0, duration, viewByteSize, boundaryItemIndex, data;
BOOL setCurrentBack = FALSE, trimTail = TRUE, getNextBoundaryItem = TRUE;
PKinesisVideoClient pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;
StackQueueIterator iterator;
PUploadHandleInfo pCurHandleInfo;
// The state and the params are validated.
// We need to find the next fragment to the persistent one and
// trim the window to that fragment - i.e. move the tail position to the fragment.
// As we move the tail, the callbacks will be fired to process the items falling out of the window.
// Update last persistedAck timestamp
pUploadHandleInfo->lastPersistedAckTs = timestamp;
// Store for metrics purposes
pKinesisVideoStream->diagnostics.persistedAcks++;
// Get the fragment start frame.
CHK_STATUS(contentViewGetItemWithTimestamp(pKinesisVideoStream->pView, timestamp, TRUE, &pCurItem));
SET_ITEM_PERSISTED_ACK(pCurItem->flags);
// Iterate linearly and find the first ready state handle
CHK_STATUS(stackQueueGetIterator(pKinesisVideoStream->pUploadInfoQueue, &iterator));
while (IS_VALID_ITERATOR(iterator)) {
CHK_STATUS(stackQueueIteratorGetItem(iterator, &data));
pCurHandleInfo = (PUploadHandleInfo) data;
CHK(pCurHandleInfo != NULL, STATUS_INTERNAL_ERROR);
if (pCurHandleInfo->handle == pUploadHandleInfo->handle) {
break;
}
if (!IS_UPLOAD_HANDLE_READY_TO_TRIM(pCurHandleInfo)) {
// got a earlier handle that hasn't finished yet. Therefore cannot trim tail.
trimTail = FALSE;
break;
}
CHK_STATUS(stackQueueIteratorNext(&iterator));
}
// Check if in view and when a persisted ack for upload handle n arrives, we dont trim off upload handle
// n-1 until upload handle n-1 has received its last persisted ack. If handle n-1 is in
// UPLOAD_HANDLE_STATE_ACK_RECEIVED or UPLOAD_HANDLE_STATE_TERMINATED state, then it has received the last
// ack and is safe to trim off.
CHK(trimTail, retStatus);
// Remember the current index
CHK_STATUS(contentViewGetCurrentIndex(pKinesisVideoStream->pView, &curItemIndex));
// Set the current to the first frame of the ACKed fragments next
CHK_STATUS(contentViewSetCurrentIndex(pKinesisVideoStream->pView, pCurItem->index + 1));
setCurrentBack = TRUE;
// Find the next boundary item which will indicate the start of the next fragment.
// NOTE: This might fail if we are still assembling a fragment and the ACK is for the previous fragment.
// Skip over already persisted fragments.
while (getNextBoundaryItem) {
retStatus = getNextBoundaryViewItem(pKinesisVideoStream, &pCurItem);
CHK(retStatus == STATUS_SUCCESS || retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS, retStatus);
if (retStatus != STATUS_SUCCESS || !CHECK_ITEM_FRAGMENT_START(pCurItem->flags) || !CHECK_ITEM_PERSISTED_ACK(pCurItem->flags)) {
getNextBoundaryItem = FALSE;
}
}
// Check if we need to process the awaiting upload info
if (WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_AWAITING_ACK &&
timestamp == pUploadHandleInfo->lastFragmentTs) {
// Reset the state to ACK received
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_ACK_RECEIVED;
// Get the available duration and size to send
CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));
// Notify the awaiting handle to enable it
CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
pUploadHandleInfo->handle, duration, viewByteSize));
}
// Reset the status and early exit in case we have no more items which means the tail is current
if (retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS) {
retStatus = STATUS_SUCCESS;
CHK(FALSE, retStatus);
}
boundaryItemIndex = pCurItem->index;
// If the boundary item is END_OF_FRAGMENT, then it should be trimmed too.
if (CHECK_ITEM_FRAGMENT_END(pCurItem->flags)) {
boundaryItemIndex++;
}
// Trim the tail
CHK_STATUS(contentViewTrimTail(pKinesisVideoStream->pView, boundaryItemIndex));
// Notify in case of an OFFLINE stream since tail has been trimmed
if (IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
pKinesisVideoClient->clientCallbacks.broadcastConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData,
pKinesisVideoStream->bufferAvailabilityCondition);
}
CleanUp:
// Set the current back if we had modified it
if (setCurrentBack && STATUS_FAILED((setViewStatus = contentViewSetCurrentIndex(pKinesisVideoStream->pView, curItemIndex)))) {
DLOGW("Failed to set the current back to index %" PRIu64 " with status 0x%08x", curItemIndex, setViewStatus);
}
LEAVES();
return retStatus;
}