STATUS getStreamData()

in src/client/src/Stream.c [1125:1561]


STATUS getStreamData(PKinesisVideoStream pKinesisVideoStream, UPLOAD_HANDLE uploadHandle, PBYTE pBuffer, UINT32 bufferSize, PUINT32 pFillSize)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS, stalenessCheckStatus = STATUS_SUCCESS;
    PKinesisVideoClient pKinesisVideoClient = NULL;
    PViewItem pViewItem = NULL;
    PBYTE pAlloc = NULL;
    UINT32 size = 0, remainingSize = bufferSize, uploadHandleCount;
    UINT64 allocSize, currentTime, duration, viewByteSize;
    PBYTE pCurPnt = pBuffer;
    BOOL streamLocked = FALSE, clientLocked = FALSE, rollbackToLastAck, restarted = FALSE, eosSent = FALSE;
    DOUBLE transferRate, deltaInSeconds;
    PUploadHandleInfo pUploadHandleInfo = NULL, pNextUploadHandleInfo = NULL;

    CHK(pKinesisVideoStream != NULL && pKinesisVideoStream->pKinesisVideoClient != NULL && pBuffer != NULL && pFillSize != NULL, STATUS_NULL_ARG);
    CHK(bufferSize != 0 && IS_VALID_UPLOAD_HANDLE(uploadHandle), STATUS_INVALID_ARG);

    pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;

    // Lock the stream
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    streamLocked = TRUE;

    // If the state of the connection is IN_USE
    // and we are not in grace period
    // and we are not in a retry state on rotation
    // then we need to rollback the current view pointer
    if ((pKinesisVideoStream->connectionState & UPLOAD_CONNECTION_STATE_IN_USE) != UPLOAD_CONNECTION_STATE_NONE) {
        if (IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
            // In case of offline mode, we just need to set the current to the tail.
            CHK_STATUS(contentViewGetTail(pKinesisVideoStream->pView, &pViewItem));
            CHK_STATUS(contentViewSetCurrentIndex(pKinesisVideoStream->pView, pViewItem->index));
        } else {
            // Calculate the rollback duration based on the stream replay duration and the latest ACKs received.
            // If the acks are not enabled or we have potentially dead host situation then we rollback to the replay duration.
            // NOTE: The received ACK should be sequential and not out-of-order.
            rollbackToLastAck = pKinesisVideoStream->streamInfo.streamCaps.fragmentAcks &&
                CONNECTION_DROPPED_HOST_ALIVE(pKinesisVideoStream->connectionDroppedResult);

            CHK_STATUS(contentViewRollbackCurrent(pKinesisVideoStream->pView, pKinesisVideoStream->streamInfo.streamCaps.replayDuration, TRUE,
                                                  rollbackToLastAck));
        }

        // Check if after the rolling back there is anything to load
        // and early exit without changing the state if we don't have any

        // Fix-up the current element
        CHK_STATUS(streamStartFixupOnReconnect(pKinesisVideoStream));

        restarted = TRUE;
    }

    // Reset the connection dropped indicator
    pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_OK;

    // Set the size first
    *pFillSize = 0;

    // Get the latest upload handle
    pUploadHandleInfo = getStreamUploadInfo(pKinesisVideoStream, uploadHandle);

    // Should indicate an abort for an invalid handle or handle that is not in the queue
    CHK(pUploadHandleInfo != NULL, STATUS_UPLOAD_HANDLE_ABORTED);

    switch (pUploadHandleInfo->state) {
        case UPLOAD_HANDLE_STATE_READY:
            // if we've created a new handle but has nothing to send
            if (pKinesisVideoStream->streamStopped) {
                // Get the duration and the size
                CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));

                if (viewByteSize == 0) {
                    pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
                    CHK(FALSE, STATUS_END_OF_STREAM);
                }
            }

            if (IS_VALID_TIMESTAMP(pKinesisVideoStream->newSessionTimestamp)) {
                // set upload handle start timestamp to map ack timestamp to view item timestamp in case
                // of using relative timestamp mode.
                pUploadHandleInfo->timestamp = pKinesisVideoStream->newSessionTimestamp;

                // Zero the sentinels
                pKinesisVideoStream->newSessionTimestamp = INVALID_TIMESTAMP_VALUE;
                pKinesisVideoStream->newSessionIndex = INVALID_VIEW_INDEX_VALUE;
            } else if (!pKinesisVideoStream->streamInfo.streamCaps.absoluteFragmentTimes) {
                // in relative time mode, upload handle timestamp is required.
                CHK(FALSE, STATUS_NO_MORE_DATA_AVAILABLE);
            }

            pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_STREAMING;

            // Update the active session count in metrics
            pKinesisVideoStream->diagnostics.totalActiveSessions++;

            break;
        case UPLOAD_HANDLE_STATE_AWAITING_ACK:
            // handle case where new handle got last ack already
            if (pUploadHandleInfo->lastPersistedAckTs == pUploadHandleInfo->lastFragmentTs) {
                pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
                CHK(FALSE, STATUS_END_OF_STREAM);
            }
            CHK(FALSE, STATUS_AWAITING_PERSISTED_ACK);

        case UPLOAD_HANDLE_STATE_ACK_RECEIVED:
            // Set the state to terminated and early return with the EOS with the right handle
            pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;

            DLOGI("Indicating an EOS after last persisted ACK is received for stream upload handle %" PRIu64, uploadHandle);
            CHK(FALSE, STATUS_END_OF_STREAM);

        case UPLOAD_HANDLE_STATE_TERMINATED:
            // This path get invoked if a connection get terminated by calling reset connection
            DLOGW("Indicating an end-of-stream for a terminated stream upload handle %" PRIu64, uploadHandle);
            CHK(FALSE, STATUS_END_OF_STREAM);

        case UPLOAD_HANDLE_STATE_ERROR:
            DLOGW("Indicating an abort for a errored stream upload handle %" PRIu64, uploadHandle);
            CHK(FALSE, STATUS_UPLOAD_HANDLE_ABORTED);
        default:
            // no-op for other UPLOAD_HANDLE states
            break;
    }

    // Continue filling the buffer from the point we left off
    do {
        // First and foremost, we need to check whether we need to
        // send the packaged not-yet sent metadata, then
        // we need to check whether we need to send the packaged EOS metadata
        // Next, we need to check whether we have an allocation handle.
        // This could happen in case when we start getting the data out or
        // when the last time we got the data the item fell off the view window.
        if (IS_UPLOAD_HANDLE_IN_SENDING_EOS_STATE(pUploadHandleInfo) && pKinesisVideoStream->metadataTracker.send) {
            // Check if we have finished sending the packaged metadata and reset the values
            if (pKinesisVideoStream->metadataTracker.offset == pKinesisVideoStream->metadataTracker.size) {
                pKinesisVideoStream->metadataTracker.offset = 0;
                pKinesisVideoStream->metadataTracker.send = FALSE;

                // Set to send the EoS
                pKinesisVideoStream->eosTracker.send = TRUE;
            } else {
                // Copy as much as we can
                size = MIN(remainingSize, pKinesisVideoStream->metadataTracker.size - pKinesisVideoStream->metadataTracker.offset);
                MEMCPY(pCurPnt, pKinesisVideoStream->metadataTracker.data + pKinesisVideoStream->metadataTracker.offset, size);

                // Set the values for the next iteration.
                pKinesisVideoStream->metadataTracker.offset += size;
                pCurPnt += size;
                remainingSize -= size;
                *pFillSize += size;
            }
        } else if (IS_UPLOAD_HANDLE_IN_SENDING_EOS_STATE(pUploadHandleInfo) && pKinesisVideoStream->eosTracker.send) {
            if (pKinesisVideoStream->eosTracker.offset == pKinesisVideoStream->eosTracker.size) {
                pKinesisVideoStream->eosTracker.offset = 0;
                pKinesisVideoStream->eosTracker.send = FALSE;

                // Calculate the state for the upload handle based on whether we need to await for the last ACK
                if (WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream)) {
                    if (IS_VALID_TIMESTAMP(pUploadHandleInfo->lastPersistedAckTs) &&
                        pUploadHandleInfo->lastPersistedAckTs == pUploadHandleInfo->lastFragmentTs) {
                        pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
                        CHK(FALSE, STATUS_END_OF_STREAM);
                    }

                    pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_AWAITING_ACK;
                    DLOGI("Handle %" PRIu64 " waiting for last persisted ack with ts %" PRIu64, pUploadHandleInfo->handle,
                          pUploadHandleInfo->lastFragmentTs);

                    // Will terminate later after the ACK is received
                    CHK(FALSE, STATUS_AWAITING_PERSISTED_ACK);
                } else {
                    pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;

                    // The client should terminate and close the stream after finalizing any remaining transfer.
                    CHK(FALSE, STATUS_END_OF_STREAM);
                }
            }

            // Copy as much as we can
            size = MIN(remainingSize, pKinesisVideoStream->eosTracker.size - pKinesisVideoStream->eosTracker.offset);
            MEMCPY(pCurPnt, pKinesisVideoStream->eosTracker.data + pKinesisVideoStream->eosTracker.offset, size);

            // Set the values
            pKinesisVideoStream->eosTracker.offset += size;
            pCurPnt += size;
            remainingSize -= size;
            *pFillSize += size;
        } else if (!IS_VALID_ALLOCATION_HANDLE(pKinesisVideoStream->curViewItem.viewItem.handle)) {
            // Reset the current view item
            pKinesisVideoStream->curViewItem.offset = pKinesisVideoStream->curViewItem.viewItem.length = 0;

            // Fix-up the current item as it might be a stream start
            CHK_STATUS(resetCurrentViewItemStreamStart(pKinesisVideoStream));

            // Need to find the next key frame boundary in case of the current rolling out of the window
            CHK_STATUS(getNextBoundaryViewItem(pKinesisVideoStream, &pViewItem));

            if (pViewItem != NULL) {
                // Reset the item ACK flags as this might be replay after rollback
                CLEAR_ITEM_BUFFERING_ACK(pViewItem->flags);
                CLEAR_ITEM_RECEIVED_ACK(pViewItem->flags);
                // Use this as a current for the next iteration
                pKinesisVideoStream->curViewItem.viewItem = *pViewItem;
            } else {
                // Couldn't find any boundary items, default to empty - early return
                CHK(FALSE, STATUS_NO_MORE_DATA_AVAILABLE);
            }
        } else if (pKinesisVideoStream->curViewItem.offset == pKinesisVideoStream->curViewItem.viewItem.length) {
            // Check if the current was a special EoFr in which case we don't need to send EoS
            eosSent = CHECK_ITEM_FRAGMENT_END(pKinesisVideoStream->curViewItem.viewItem.flags) ? TRUE : FALSE;

            // Fix-up the current item as it might be a stream start
            CHK_STATUS(resetCurrentViewItemStreamStart(pKinesisVideoStream));

            // Second, we need to check whether the existing view item has been exhausted
            retStatus = getNextViewItem(pKinesisVideoStream, &pViewItem);
            CHK(retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS || retStatus == STATUS_SUCCESS, retStatus);

            // Special handling for the stopped stream.
            if (retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS) {
                // This requires user to call stop stream as soon as they are done.
                // Need to kick off the EOS sequence
                if (!pKinesisVideoStream->streamStopped) {
                    // Early return
                    CHK(FALSE, retStatus);
                }

                pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATING;
                pKinesisVideoStream->eosTracker.send = TRUE;
                pKinesisVideoStream->eosTracker.offset = eosSent ? pKinesisVideoStream->eosTracker.size : 0;

                // If we have eosSent then we can't append the last metadata afterwards as it will break the MKV cluster
                if (eosSent) {
                    pKinesisVideoStream->metadataTracker.send = FALSE;
                }
            } else {
                // Reset the item ACK flags as this might be replay after rollback
                CLEAR_ITEM_BUFFERING_ACK(pViewItem->flags);
                CLEAR_ITEM_RECEIVED_ACK(pViewItem->flags);
                // Store it in the current view
                pKinesisVideoStream->curViewItem.viewItem = *pViewItem;
                pKinesisVideoStream->curViewItem.offset = 0;

                // Check if we are finishing the previous stream and we have a boundary item
                if (CHECK_ITEM_STREAM_START(pKinesisVideoStream->curViewItem.viewItem.flags)) {
                    // Set the stream handle as we have exhausted the previous
                    pUploadHandleInfo->state =
                        WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) ? UPLOAD_HANDLE_STATE_TERMINATING : UPLOAD_HANDLE_STATE_TERMINATED;

                    // Indicate to send EOS in the next iteration
                    pKinesisVideoStream->eosTracker.send = TRUE;
                    pKinesisVideoStream->eosTracker.offset = eosSent ? pKinesisVideoStream->eosTracker.size : 0;

                    // If we have eosSent then we can't append the last metadata afterwards as it will break the MKV cluster
                    if (eosSent) {
                        pKinesisVideoStream->metadataTracker.send = FALSE;
                    }
                }
            }
        } else {
            // Now, we can stream enough data out if we don't have a zero item
            CHK(pKinesisVideoStream->curViewItem.offset != pKinesisVideoStream->curViewItem.viewItem.length, STATUS_NO_MORE_DATA_AVAILABLE);

            // Set the key frame indicator on fragment or session start to track the persist ACKs
            if (CHECK_ITEM_FRAGMENT_START(pKinesisVideoStream->curViewItem.viewItem.flags) ||
                CHECK_ITEM_STREAM_START(pKinesisVideoStream->curViewItem.viewItem.flags)) {
                pUploadHandleInfo->lastFragmentTs = pKinesisVideoStream->curViewItem.viewItem.ackTimestamp;
            }

            // Lock the client
            pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
            clientLocked = TRUE;

            // Fill the rest of the buffer of the current view item first
            // Map the storage
            CHK_STATUS(heapMap(pKinesisVideoClient->pHeap, pKinesisVideoStream->curViewItem.viewItem.handle, (PVOID*) &pAlloc, &allocSize));
            CHK(allocSize < MAX_UINT32 && (UINT32) allocSize >= pKinesisVideoStream->curViewItem.viewItem.length, STATUS_INVALID_ALLOCATION_SIZE);

            // Validate we had allocated enough storage just in case
            CHK(pKinesisVideoStream->curViewItem.viewItem.length - pKinesisVideoStream->curViewItem.offset <= allocSize,
                STATUS_VIEW_ITEM_SIZE_GREATER_THAN_ALLOCATION);

            // Copy as much as we can
            size = MIN(remainingSize, pKinesisVideoStream->curViewItem.viewItem.length - pKinesisVideoStream->curViewItem.offset);
            MEMCPY(pCurPnt, pAlloc + pKinesisVideoStream->curViewItem.offset, size);

            // Unmap the storage for the frame
            CHK_STATUS(heapUnmap(pKinesisVideoClient->pHeap, ((PVOID) pAlloc)));

            // unLock the client
            pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
            clientLocked = FALSE;

            // Set the values
            pKinesisVideoStream->curViewItem.offset += size;
            pCurPnt += size;
            remainingSize -= size;
            *pFillSize += size;
        }
    } while (remainingSize != 0);

CleanUp:

    // Run staleness detection if we have ACKs enabled and if we have retrieved any data
    if (pFillSize != NULL && *pFillSize != 0) {
        stalenessCheckStatus = checkForConnectionStaleness(pKinesisVideoStream, &pKinesisVideoStream->curViewItem.viewItem);
    }

    if (retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS) {
        // Replace it with a client side error
        retStatus = STATUS_NO_MORE_DATA_AVAILABLE;
    }

    // Calculate the metrics for transfer rate on successful call and when we do have some data.
    if (pKinesisVideoStream->streamInfo.streamCaps.recalculateMetrics && IS_VALID_GET_STREAM_DATA_STATUS(retStatus) && pFillSize != NULL &&
        *pFillSize != 0) {
        // Calculate the current transfer rate only after the first iteration
        currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
        pKinesisVideoStream->diagnostics.accumulatedByteCount += *pFillSize;

        // Store the overall transferred byte count
        pKinesisVideoStream->diagnostics.transferredBytes += *pFillSize;

        if (!restarted) {
            // Calculate the delta time in seconds
            deltaInSeconds = (DOUBLE)(currentTime - pKinesisVideoStream->diagnostics.lastTransferRateTimestamp) / HUNDREDS_OF_NANOS_IN_A_SECOND;

            if (deltaInSeconds > TRANSFER_RATE_MEASURING_INTERVAL_EPSILON) {
                transferRate = pKinesisVideoStream->diagnostics.accumulatedByteCount / deltaInSeconds;

                // Update the current frame rate
                pKinesisVideoStream->diagnostics.currentTransferRate =
                    (UINT64) EMA_ACCUMULATOR_GET_NEXT(pKinesisVideoStream->diagnostics.currentTransferRate, transferRate);

                // Zero out for next time measuring
                pKinesisVideoStream->diagnostics.accumulatedByteCount = 0;

                // Store the last frame timestamp
                pKinesisVideoStream->diagnostics.lastTransferRateTimestamp = currentTime;
            }
        } else {
            // Store the last frame timestamp
            pKinesisVideoStream->diagnostics.lastTransferRateTimestamp = currentTime;
        }
    }

    // Special handling for stopped stream when the retention period is zero or no more data available
    if (pKinesisVideoStream->streamStopped) {
        // Trigger stream closed function when we don't need to wait for the persisted ack
        // Or if we do need to wait for the ack and the state of the upload handler is terminated
        if (retStatus == STATUS_END_OF_STREAM &&
            (!WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) ||
             (pUploadHandleInfo != NULL && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED))) {
            // Get the duration and the size
            CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));

            // Get the number of non terminated handle.
            CHK_STATUS(stackQueueGetCount(pKinesisVideoStream->pUploadInfoQueue, &uploadHandleCount));

            // If there is no more data to send and current handle is the last one, wrap up by calling streamClosedFn.
            if (viewByteSize == 0 && uploadHandleCount == 1) {
                CHK_STATUS(notifyStreamClosed(pKinesisVideoStream, uploadHandle));
            }
        }
    }

    // when pUploadHandleInfo is in UPLOAD_HANDLE_STATE_TERMINATED or UPLOAD_HANDLE_STATE_AWAITING_ACK, it means current handle
    // has sent all the data. So it's safe to unblock the next one if any. If WAIT_FOR_PERSISTED_ACK is enabled, no need
    // to execute data available again when UPLOAD_HANDLE_STATE_TERMINATED because we did it already when upload handle
    // was in UPLOAD_HANDLE_STATE_AWAITING_ACK state.
    if (NULL != pUploadHandleInfo &&
        (pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_AWAITING_ACK ||
         (!WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED))) {
        // Special handling for the case:
        // When the stream has been stopped so no more putFrame calls that drive the data availability
        // When the current upload handle is in EoS
        // When in offline mode, and the putFrame thread has been blocked so no more putFrame calls that drive
        // the data availability
        // When we still have ready upload handles which are awaiting

        // We need to trigger their data ready callback to enable them

        // Get the next upload handle that is ready to stream.
        // Next upload handle could be in NEW state if there is a delay in putStreamResult
        pNextUploadHandleInfo =
            getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_NEW | UPLOAD_HANDLE_STATE_READY | UPLOAD_HANDLE_STATE_STREAMING);

        // Notify the awaiting handle to enable it
        if (pNextUploadHandleInfo != NULL) {
            // Get the duration and the size
            CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));

            // If there no more item for the next handle to send and stream has stopped, set its state to terminated
            // and poke the handle. The handle is supposed to terminate gracefully and poke the next handle if there is any.
            if (viewByteSize == 0 && pKinesisVideoStream->streamStopped) {
                pNextUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
            }

            CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
                pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
                pNextUploadHandleInfo->handle, duration, viewByteSize));
        }
    }

    // Remove the upload handle if it is in UPLOAD_HANDLE_STATE_TERMINATED state
    if (NULL != pUploadHandleInfo && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED) {
        deleteStreamUploadInfo(pKinesisVideoStream, pUploadHandleInfo);

        // if there is no more active upload handle and we still have bytes to transfer,
        // call kinesisVideoStreamResetConnection to create new upload session.
        pUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_ACTIVE);

        if (pUploadHandleInfo == NULL) {
            // Get the duration and the size
            getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize);
            if (viewByteSize != 0) {
                kinesisVideoStreamResetConnection(TO_STREAM_HANDLE(pKinesisVideoStream));
            }
        }
    }

    if (clientLocked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    }

    if (streamLocked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    }

    // Return the staleness check status if it's not a success
    if (STATUS_FAILED(stalenessCheckStatus)) {
        retStatus = stalenessCheckStatus;
    }

    LEAVES();
    return retStatus;
}