STATUS streamFragmentPersistedAck()

in src/client/src/Stream.c [2442:2556]


STATUS streamFragmentPersistedAck(PKinesisVideoStream pKinesisVideoStream, UINT64 timestamp, PUploadHandleInfo pUploadHandleInfo)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS, setViewStatus = STATUS_SUCCESS;
    PViewItem pCurItem;
    UINT64 curItemIndex = 0, duration, viewByteSize, boundaryItemIndex, data;
    BOOL setCurrentBack = FALSE, trimTail = TRUE, getNextBoundaryItem = TRUE;
    PKinesisVideoClient pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;
    StackQueueIterator iterator;
    PUploadHandleInfo pCurHandleInfo;

    // The state and the params are validated.
    // We need to find the next fragment to the persistent one and
    // trim the window to that fragment - i.e. move the tail position to the fragment.
    // As we move the tail, the callbacks will be fired to process the items falling out of the window.

    // Update last persistedAck timestamp
    pUploadHandleInfo->lastPersistedAckTs = timestamp;

    // Store for metrics purposes
    pKinesisVideoStream->diagnostics.persistedAcks++;

    // Get the fragment start frame.
    CHK_STATUS(contentViewGetItemWithTimestamp(pKinesisVideoStream->pView, timestamp, TRUE, &pCurItem));
    SET_ITEM_PERSISTED_ACK(pCurItem->flags);

    // Iterate linearly and find the first ready state handle
    CHK_STATUS(stackQueueGetIterator(pKinesisVideoStream->pUploadInfoQueue, &iterator));
    while (IS_VALID_ITERATOR(iterator)) {
        CHK_STATUS(stackQueueIteratorGetItem(iterator, &data));

        pCurHandleInfo = (PUploadHandleInfo) data;
        CHK(pCurHandleInfo != NULL, STATUS_INTERNAL_ERROR);
        if (pCurHandleInfo->handle == pUploadHandleInfo->handle) {
            break;
        }
        if (!IS_UPLOAD_HANDLE_READY_TO_TRIM(pCurHandleInfo)) {
            // got a earlier handle that hasn't finished yet. Therefore cannot trim tail.
            trimTail = FALSE;
            break;
        }

        CHK_STATUS(stackQueueIteratorNext(&iterator));
    }

    // Check if in view and when a persisted ack for upload handle n arrives, we dont trim off upload handle
    // n-1 until upload handle n-1 has received its last persisted ack. If handle n-1 is in
    // UPLOAD_HANDLE_STATE_ACK_RECEIVED or UPLOAD_HANDLE_STATE_TERMINATED state, then it has received the last
    // ack and is safe to trim off.
    CHK(trimTail, retStatus);

    // Remember the current index
    CHK_STATUS(contentViewGetCurrentIndex(pKinesisVideoStream->pView, &curItemIndex));

    // Set the current to the first frame of the ACKed fragments next
    CHK_STATUS(contentViewSetCurrentIndex(pKinesisVideoStream->pView, pCurItem->index + 1));
    setCurrentBack = TRUE;

    // Find the next boundary item which will indicate the start of the next fragment.
    // NOTE: This might fail if we are still assembling a fragment and the ACK is for the previous fragment.
    // Skip over already persisted fragments.
    while (getNextBoundaryItem) {
        retStatus = getNextBoundaryViewItem(pKinesisVideoStream, &pCurItem);
        CHK(retStatus == STATUS_SUCCESS || retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS, retStatus);
        if (retStatus != STATUS_SUCCESS || !CHECK_ITEM_FRAGMENT_START(pCurItem->flags) || !CHECK_ITEM_PERSISTED_ACK(pCurItem->flags)) {
            getNextBoundaryItem = FALSE;
        }
    }

    // Check if we need to process the awaiting upload info
    if (WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_AWAITING_ACK &&
        timestamp == pUploadHandleInfo->lastFragmentTs) {
        // Reset the state to ACK received
        pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_ACK_RECEIVED;

        // Get the available duration and size to send
        CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));

        // Notify the awaiting handle to enable it
        CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
            pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
            pUploadHandleInfo->handle, duration, viewByteSize));
    }

    // Reset the status and early exit in case we have no more items which means the tail is current
    if (retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS) {
        retStatus = STATUS_SUCCESS;
        CHK(FALSE, retStatus);
    }

    boundaryItemIndex = pCurItem->index;

    // If the boundary item is END_OF_FRAGMENT, then it should be trimmed too.
    if (CHECK_ITEM_FRAGMENT_END(pCurItem->flags)) {
        boundaryItemIndex++;
    }

    // Trim the tail
    CHK_STATUS(contentViewTrimTail(pKinesisVideoStream->pView, boundaryItemIndex));

    // Notify in case of an OFFLINE stream since tail has been trimmed
    if (IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
        pKinesisVideoClient->clientCallbacks.broadcastConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData,
                                                                          pKinesisVideoStream->bufferAvailabilityCondition);
    }
CleanUp:

    // Set the current back if we had modified it
    if (setCurrentBack && STATUS_FAILED((setViewStatus = contentViewSetCurrentIndex(pKinesisVideoStream->pView, curItemIndex)))) {
        DLOGW("Failed to set the current back to index %" PRIu64 " with status 0x%08x", curItemIndex, setViewStatus);
    }

    LEAVES();
    return retStatus;
}