STATUS streamFragmentAckEvent()

in src/client/src/StreamEvent.c [784:914]


STATUS streamFragmentAckEvent(PKinesisVideoStream pKinesisVideoStream, UPLOAD_HANDLE uploadHandle, PFragmentAck pFragmentAck)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PKinesisVideoClient pKinesisVideoClient = NULL;
    PUploadHandleInfo pUploadHandleInfo;
    BOOL locked = FALSE, inView = FALSE;
    UINT64 timestamp = 0, errorSkipStart, curIndex;
    PViewItem pViewItem;

    CHK(pKinesisVideoStream != NULL && pKinesisVideoStream->pKinesisVideoClient != NULL && pFragmentAck != NULL, STATUS_NULL_ARG);

    // Check the version
    CHK(pFragmentAck->version <= FRAGMENT_ACK_CURRENT_VERSION, STATUS_INVALID_FRAGMENT_ACK_VERSION);

    // Early return if we have an IDLE ack
    if (pFragmentAck->ackType == FRAGMENT_ACK_TYPE_IDLE) {
        // Nothing to do if we have an IDLE ack
        CHK(FALSE, retStatus);
    }

    pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;

    // Lock the state
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    locked = TRUE;

    // First of all, check if the ACK is for a session that's expired/closed already and ignore if it is
    pUploadHandleInfo = getStreamUploadInfo(pKinesisVideoStream, uploadHandle);
    if (NULL == pUploadHandleInfo ||
        (pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_ERROR || pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED) ||
        !IS_VALID_UPLOAD_HANDLE(pUploadHandleInfo->handle)) {
        // No session is present - early return
        DLOGW("An ACK is received for an already expired upload handle %" PRIu64, uploadHandle);
        CHK(FALSE, retStatus);
    }

    // Fix-up the invalid timestamp
    if (!IS_VALID_TIMESTAMP(pFragmentAck->timestamp)) {
        // Use the current from the view.
        // If the current is greater than head which is the case when the networking pulls fast enough
        // then we will use the head instead.
        CHK_STATUS(contentViewGetCurrentIndex(pKinesisVideoStream->pView, &curIndex));
        CHK_STATUS(contentViewGetHead(pKinesisVideoStream->pView, &pViewItem));

        if (curIndex < pViewItem->index) {
            CHK_STATUS(contentViewGetItemAt(pKinesisVideoStream->pView, curIndex, &pViewItem));
        }

        timestamp = pViewItem->ackTimestamp;

        // If we already have an persisted ACK, no need to go farther back
        errorSkipStart =
            IS_VALID_TIMESTAMP(pUploadHandleInfo->lastPersistedAckTs) ? pUploadHandleInfo->lastPersistedAckTs : pUploadHandleInfo->timestamp;
    } else {
        // Calculate the timestamp based on the ACK.
        // Convert the timestamp
        CHK_STATUS(mkvgenTimecodeToTimestamp(pKinesisVideoStream->pMkvGenerator, pFragmentAck->timestamp, &timestamp));

        // In case we have a relative cluster timestamp stream we need to adjust for the stream start.
        // The stream start timestamp is extracted from the stream session map.
        if (!pKinesisVideoStream->streamInfo.streamCaps.absoluteFragmentTimes && IS_VALID_TIMESTAMP(pUploadHandleInfo->timestamp)) {
            // Adjust the relative timestamp to make an absolute timestamp
            timestamp += pUploadHandleInfo->timestamp;
        }

        // No skipping farther than the given timestamp
        errorSkipStart = timestamp;
    }

    // Quick check if we have the timestamp in the view window and if not then bail out early
    CHK_STATUS(contentViewTimestampInRange(pKinesisVideoStream->pView, timestamp, TRUE, &inView));

    // NOTE: IMPORTANT: For the Error Ack case we will still need to process the ACK. The side-effect of
    // processing the Error Ack is the connection termination which is needed as the higher-level clients like
    // CURL might not trigger the termination after the LB sends 'FIN' TCP packet.
    switch (pFragmentAck->ackType) {
        case FRAGMENT_ACK_TYPE_BUFFERING:
            if (inView) {
                CHK_STATUS(streamFragmentBufferingAck(pKinesisVideoStream, timestamp));
            }

            break;
        case FRAGMENT_ACK_TYPE_RECEIVED:
            if (inView) {
                CHK_STATUS(streamFragmentReceivedAck(pKinesisVideoStream, timestamp));
            }

            break;
        case FRAGMENT_ACK_TYPE_PERSISTED:
            if (inView) {
                CHK_STATUS(streamFragmentPersistedAck(pKinesisVideoStream, timestamp, pUploadHandleInfo));
            }

            break;
        case FRAGMENT_ACK_TYPE_ERROR:
            if (!inView) {
                // Apply to the earliest.
                CHK_STATUS(contentViewGetTail(pKinesisVideoStream->pView, &pViewItem));
                timestamp = pViewItem->ackTimestamp;
            }

            CHK_STATUS(streamFragmentErrorAck(pKinesisVideoStream, errorSkipStart, timestamp, pFragmentAck->result));

            break;
        default:
            // shouldn't ever be the case.
            CHK(FALSE, STATUS_INVALID_FRAGMENT_ACK_TYPE);
    }

    // Still return an error on an out-of-bounds timestamp
    CHK(inView, STATUS_ACK_TIMESTAMP_NOT_IN_VIEW_WINDOW);

CleanUp:

    if (pKinesisVideoClient != NULL) {
        // We will notify the fragment ACK received callback even if the processing failed
        if (pKinesisVideoClient->clientCallbacks.fragmentAckReceivedFn != NULL) {
            pKinesisVideoClient->clientCallbacks.fragmentAckReceivedFn(pKinesisVideoClient->clientCallbacks.customData,
                                                                       TO_STREAM_HANDLE(pKinesisVideoStream), uploadHandle, pFragmentAck);
        }

        // Unlock the stream
        if (locked) {
            pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
        }
    }

    LEAVES();
    return retStatus;
}