STATUS putFrame()

in src/client/src/Stream.c [725:1120]


STATUS putFrame(PKinesisVideoStream pKinesisVideoStream, PFrame pFrame)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PKinesisVideoClient pKinesisVideoClient = NULL;
    ALLOCATION_HANDLE allocHandle = INVALID_ALLOCATION_HANDLE_VALUE;
    UINT64 remainingSize = 0, remainingDuration = 0, thresholdPercent = 0, duration = 0, viewByteSize = 0, allocSize = 0;
    PBYTE pAlloc = NULL;
    UINT32 trackIndex, packagedSize = 0, packagedMetadataSize = 0, overallSize = 0, itemFlags = ITEM_FLAG_NONE;
    BOOL streamLocked = FALSE, clientLocked = FALSE, freeOnError = TRUE;
    EncodedFrameInfo encodedFrameInfo;
    MKV_STREAM_STATE generatorState = MKV_STATE_START_BLOCK;
    UINT64 currentTime = INVALID_TIMESTAMP_VALUE;
    DOUBLE frameRate, deltaInSeconds;
    PViewItem pViewItem = NULL;
    PUploadHandleInfo pUploadHandleInfo;
    UINT64 windowDuration, currentDuration;
    PTrackInfo pTrackInfo = NULL;

    CHK(pKinesisVideoStream != NULL && pFrame != NULL, STATUS_NULL_ARG);
    pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;

    if (!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
        // Lookup the track that pFrame belongs to
        CHK_STATUS(mkvgenGetTrackInfo(pKinesisVideoStream->streamInfo.streamCaps.trackInfoList,
                                      pKinesisVideoStream->streamInfo.streamCaps.trackInfoCount, pFrame->trackId, &pTrackInfo, &trackIndex));
    }

    // Check if the stream has been stopped
    CHK(!pKinesisVideoStream->streamStopped, STATUS_STREAM_HAS_BEEN_STOPPED);

    // Lock the stream
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    streamLocked = TRUE;

    fixupFrame(pFrame);

    // Set the last PutFrame time to current time
    currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);

    if (pKinesisVideoStream->eofrFrame) {
        // After EOFR we again need to skip non-key frames before starting up new fragment
        pKinesisVideoStream->skipNonKeyFrames = TRUE;
        // Check we're not seeing an EOFR frame immediately after an EOFR frame
        CHK(!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags), STATUS_MULTIPLE_CONSECUTIVE_EOFR);
    }

    // Validate that we are not seeing EoFr explicit marker in a non-key-frame fragmented stream
    if (!pKinesisVideoStream->streamInfo.streamCaps.keyFrameFragmentation) {
        CHK(!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags), STATUS_END_OF_FRAGMENT_FRAME_INVALID_STATE);
    }

    // Need to check whether the streaming token has expired.
    // If we have the streaming token and it's in the grace period
    // then we need to go back to the get streaming end point and
    // get the streaming end point and the new streaming token.
    CHK_STATUS(checkStreamingTokenExpiration(pKinesisVideoStream));

    // Check if we have passed the delay for resetting generator.
    if (IS_VALID_TIMESTAMP(pKinesisVideoStream->resetGeneratorTime)) {
        currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
        if (currentTime >= pKinesisVideoStream->resetGeneratorTime) {
            pKinesisVideoStream->resetGeneratorTime = INVALID_TIMESTAMP_VALUE;
            pKinesisVideoStream->resetGeneratorOnKeyFrame = TRUE;
        }
    }

    // NOTE: If the connection has been reset we need to start from a new header
    if (pKinesisVideoStream->streamState == STREAM_STATE_NEW && pKinesisVideoStream->streamReady) {
        // Step the state machine once to get out of the Ready state
        CHK_STATUS(stepStateMachine(pKinesisVideoStream->base.pStateMachine));
    }

    // if we need to reset the generator on the next key frame (during the rotation only)
    if (pKinesisVideoStream->resetGeneratorOnKeyFrame && CHECK_FRAME_FLAG_KEY_FRAME(pFrame->flags)) {
        CHK_STATUS(mkvgenResetGenerator(pKinesisVideoStream->pMkvGenerator));
        pKinesisVideoStream->resetGeneratorOnKeyFrame = FALSE;
    }

    // Check if the frames should be skipped and whether the skip flag needs to be reset
    // If current frame is EOFR, We will package EOFR frame and we do not need to reset skip flag
    if (pKinesisVideoStream->skipNonKeyFrames) {
        if (CHECK_FRAME_FLAG_KEY_FRAME(pFrame->flags)) {
            pKinesisVideoStream->skipNonKeyFrames = FALSE;
        } else if (!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
            pKinesisVideoStream->diagnostics.skippedFrames++;
            CHK(FALSE, retStatus); // skip processing the frame
        }
    }

    // Package and store the frame.
    // If the frame is a special End-of-Fragment indicator
    // then we need to package the not yet sent metadata with EoFr metadata
    // and indicate the new cluster boundary

    // Currently, we are storing every frame in a separate allocation
    // We will need to optimize for storing perhaps an entire cluster but
    // Not right now as we need to deal with mapping/unmapping and allocating
    // the cluster storage up-front which might be more hassle than needed.

    if (CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
        // We will append the EoFr tag and package the tags
        CHK_STATUS(appendValidatedMetadata(pKinesisVideoStream, (PCHAR) EOFR_METADATA_NAME, (PCHAR) "", FALSE, pKinesisVideoStream->eosTracker.size));

        // Package the not-applied metadata as the frame bits
        CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, TRUE, NULL, &packagedSize));
    } else {
        // Get the size of the packaged frame
        CHK_STATUS(mkvgenPackageFrame(pKinesisVideoStream->pMkvGenerator, pFrame, pTrackInfo, NULL, &packagedSize, &encodedFrameInfo));

        // Preserve the current stream state as it might change after we apply the metadata
        generatorState = encodedFrameInfo.streamState;

        // Need to handle the metadata if any.
        // NOTE: The accumulated metadata will be inserted before the fragment start due to MKV limitation
        // as tags are level 1 and will break MKV clusters.
        if (generatorState == MKV_STATE_START_STREAM || generatorState == MKV_STATE_START_CLUSTER) {
            // Calculate the size of the metadata first
            CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, FALSE, NULL, &packagedMetadataSize));
        }
    }

    // Overall frame allocation size
    overallSize = packagedSize + packagedMetadataSize;

    pKinesisVideoStream->maxFrameSizeSeen = MAX(pKinesisVideoStream->maxFrameSizeSeen, overallSize);

    // Might need to block on the availability in the OFFLINE mode
    CHK_STATUS(handleAvailability(pKinesisVideoStream, overallSize, &allocHandle));

    if (IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
        // offline streaming mode can block so we need to reset the currentTime just in case
        currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
    }

    // Lock the client
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    clientLocked = TRUE;

    // Ensure we have space and if not then bail
    CHK(IS_VALID_ALLOCATION_HANDLE(allocHandle), STATUS_STORE_OUT_OF_MEMORY);

    // Map the storage
    CHK_STATUS(heapMap(pKinesisVideoClient->pHeap, allocHandle, (PVOID*) &pAlloc, &allocSize));

    // Validate we had allocated enough storage just in case
    CHK(overallSize <= allocSize, STATUS_ALLOCATION_SIZE_SMALLER_THAN_REQUESTED);

    // Check if we are packaging special EoFr
    if (CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
        // Store the metadata at the beginning of the allocation
        CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, TRUE, pAlloc, &packagedSize));

        // Synthesize the encodedFrameInfo
        CHK_STATUS(mkvgenGetCurrentTimestamps(pKinesisVideoStream->pMkvGenerator, &encodedFrameInfo.streamStartTs, &encodedFrameInfo.clusterPts,
                                              &encodedFrameInfo.clusterDts));
        encodedFrameInfo.dataOffset = 0;
        encodedFrameInfo.streamState = MKV_STATE_START_BLOCK;

        // Pre-set the item flags to differentiate EoFr in EoS case
        SET_ITEM_FRAGMENT_END(itemFlags);

        // Synthesize the frame timestamps based on the last content view entry
        CHK_STATUS(contentViewGetHead(pKinesisVideoStream->pView, &pViewItem));
        encodedFrameInfo.frameDts = pViewItem->timestamp + pViewItem->duration - encodedFrameInfo.clusterDts;
        encodedFrameInfo.framePts = pViewItem->ackTimestamp + pViewItem->duration - encodedFrameInfo.clusterPts;
        encodedFrameInfo.duration = 0;

        // Set the EoFr flag so we won't append not-sent metadata/EOS on StreamStop
        pKinesisVideoStream->eofrFrame = TRUE;
    } else {
        // Actually package the bits in the storage
        CHK_STATUS(mkvgenPackageFrame(pKinesisVideoStream->pMkvGenerator, pFrame, pTrackInfo, pAlloc, &packagedSize, &encodedFrameInfo));

        // Package the metadata if specified
        if (packagedMetadataSize != 0) {
            // Move the packaged bits out first to make room for the metadata
            // NOTE: need to use MEMMOVE due to the overlapping ranges
            MEMMOVE(pAlloc + encodedFrameInfo.dataOffset + packagedMetadataSize, pAlloc + encodedFrameInfo.dataOffset,
                    packagedSize - encodedFrameInfo.dataOffset);

            // Metadata will be packaged after the MKV header but before the cluster
            CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, FALSE, pAlloc + encodedFrameInfo.dataOffset,
                                             &packagedMetadataSize));
        }
    }

    // Unmap the storage for the frame
    CHK_STATUS(heapUnmap(pKinesisVideoClient->pHeap, ((PVOID) pAlloc)));

    // Check for storage pressures. No need for offline mode as the media pipeline will be blocked when there
    // is not enough storage
    if (!IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
        remainingSize = pKinesisVideoClient->pHeap->heapLimit - pKinesisVideoClient->pHeap->heapSize;
        thresholdPercent = (UINT32)(((DOUBLE) remainingSize / pKinesisVideoClient->pHeap->heapLimit) * 100);

        if (thresholdPercent <= STORAGE_PRESSURE_NOTIFICATION_THRESHOLD) {
            pKinesisVideoStream->diagnostics.storagePressures++;

            if (pKinesisVideoClient->clientCallbacks.storageOverflowPressureFn != NULL) {
                // Notify the client app about buffer pressure
                CHK_STATUS(
                    pKinesisVideoClient->clientCallbacks.storageOverflowPressureFn(pKinesisVideoClient->clientCallbacks.customData, remainingSize));
            }
        }

        // No need to report buffer duration overflow in offline since the putFrame thread will be blocked.
        // Only report buffer duration overflow if retention is non-zero, since if retention is zero, there will be no
        // persisted ack and buffer will drop off tail all the time.
        if (pKinesisVideoStream->streamInfo.retention != RETENTION_PERIOD_SENTINEL &&
            pKinesisVideoClient->clientCallbacks.bufferDurationOverflowPressureFn != NULL) {
            CHK_STATUS(contentViewGetWindowDuration(pKinesisVideoStream->pView, &currentDuration, &windowDuration));

            // Check for buffer duration pressure. Note that streamCaps.bufferDuration will never be 0.
            remainingDuration = pKinesisVideoStream->streamInfo.streamCaps.bufferDuration - windowDuration;
            thresholdPercent = (UINT32)(((DOUBLE) remainingDuration / pKinesisVideoStream->streamInfo.streamCaps.bufferDuration) * 100);
            if (thresholdPercent <= BUFFER_DURATION_PRESSURE_NOTIFICATION_THRESHOLD) {
                pKinesisVideoStream->diagnostics.bufferPressures++;

                // Notify the client app about buffer pressure
                CHK_STATUS(pKinesisVideoClient->clientCallbacks.bufferDurationOverflowPressureFn(
                    pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), remainingDuration));
            }
        }
    }

    // Generate the view flags
    switch (generatorState) {
        case MKV_STATE_START_STREAM:
            SET_ITEM_STREAM_START(itemFlags);
            SET_ITEM_STREAM_START_DEBUG(itemFlags);
            // fall-through
        case MKV_STATE_START_CLUSTER:
            SET_ITEM_FRAGMENT_START(itemFlags);

            // Clear the EoFr flag
            pKinesisVideoStream->eofrFrame = FALSE;
            break;
        case MKV_STATE_START_BLOCK:
            break;
    }

    if (CHECK_ITEM_FRAGMENT_START(itemFlags) && pKinesisVideoClient->deviceInfo.clientInfo.logMetric) {
        currentTime = IS_VALID_TIMESTAMP(currentTime)
            ? currentTime
            : pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);

        if (currentTime >= pKinesisVideoStream->diagnostics.nextLoggingTime) {
            // unlock the client
            pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
            clientLocked = FALSE;

            // unlock the stream
            pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
            streamLocked = FALSE;

            if (STATUS_FAILED(logStreamMetric(pKinesisVideoStream))) {
                DLOGW("Failed to log stream metric with error 0x%08x", retStatus);
            }
            // lock the stream
            pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
            streamLocked = TRUE;

            // lock the client
            pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
            clientLocked = TRUE;
        }

        // Update the next log time
        pKinesisVideoStream->diagnostics.nextLoggingTime = currentTime + pKinesisVideoClient->deviceInfo.clientInfo.metricLoggingPeriod;
    }

    // Put the frame into the view.
    // NOTE: For the timestamp we will specify the cluster timestamp + frame timestamp which
    // will be useful later to find the start of the fragment corresponding to the ACK timecode.
    // We will also use the a non simple block state as a fragment start indicator - an IDR frame.
    // NOTE: We are using the decoding timestamp as our item timestamp.
    // Although, the intra-cluster pts might vary from dts, the cluster start frame pts
    // will be equal to dts which is important when processing ACKs.
    // NOTE: The size of the actual data might be smaller than the allocated size due to the fact that
    // calculating the packaged metadata and frame might have preserved the MKV header twice as the calculation
    // of the size doesn't mutate the state of the generator. We will use the actual packaged size.
    CHK_STATUS(contentViewAddItem(pKinesisVideoStream->pView, encodedFrameInfo.clusterDts + encodedFrameInfo.frameDts,
                                  encodedFrameInfo.clusterPts + encodedFrameInfo.framePts, encodedFrameInfo.duration, allocHandle,
                                  encodedFrameInfo.dataOffset, overallSize, itemFlags));

    // From now on we don't need to free the allocation as it's in the view already and will be collected
    freeOnError = FALSE;

    if (CHECK_ITEM_STREAM_START(itemFlags)) {
        // Store the stream start timestamp for ACK timecode adjustment for relative cluster timecode streams
        pKinesisVideoStream->newSessionTimestamp = encodedFrameInfo.streamStartTs;
        CHK_STATUS(contentViewGetHead(pKinesisVideoStream->pView, &pViewItem));
        pKinesisVideoStream->newSessionIndex = pViewItem->index;
    }

    // We need to check for the latency pressure. If the view head is ahead of the current
    // for more than the specified max latency then we need to call the optional user callback.
    // NOTE: A special sentinel value is used to determine whether the latency is specified.
    if (pKinesisVideoStream->streamInfo.streamCaps.maxLatency != STREAM_LATENCY_PRESSURE_CHECK_SENTINEL &&
        pKinesisVideoClient->clientCallbacks.streamLatencyPressureFn != NULL) {
        // Get the window duration from the view
        CHK_STATUS(contentViewGetWindowDuration(pKinesisVideoStream->pView, &duration, NULL));

        // Check for the breach and invoke the user provided callback
        if (duration > pKinesisVideoStream->streamInfo.streamCaps.maxLatency) {
            pKinesisVideoStream->diagnostics.latencyPressures++;

            CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamLatencyPressureFn(pKinesisVideoClient->clientCallbacks.customData,
                                                                                    TO_STREAM_HANDLE(pKinesisVideoStream), duration));
        }
    }

    // Notify about data is available
    pUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_READY | UPLOAD_HANDLE_STATE_STREAMING);
    if (NULL != pUploadHandleInfo && IS_VALID_UPLOAD_HANDLE(pUploadHandleInfo->handle)) {
        // Get the duration and the size
        CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));

        // Call the notification callback
        CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
            pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
            pUploadHandleInfo->handle, duration, viewByteSize));
    }

    // Recalculate frame rate if enabled
    if (pKinesisVideoStream->streamInfo.streamCaps.recalculateMetrics) {
        // Calculate the current frame rate only after the first iteration
        currentTime = IS_VALID_TIMESTAMP(currentTime)
            ? currentTime
            : pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);

        if (pTrackInfo != NULL && pTrackInfo->trackType == MKV_TRACK_INFO_TYPE_VIDEO) {
            if (!CHECK_ITEM_STREAM_START(itemFlags)) {
                // Calculate the delta time in seconds
                deltaInSeconds = (DOUBLE)(currentTime - pKinesisVideoStream->diagnostics.lastFrameRateTimestamp) / HUNDREDS_OF_NANOS_IN_A_SECOND;
                if (deltaInSeconds != 0) {
                    frameRate = 1 / deltaInSeconds;

                    // Update the current frame rate
                    pKinesisVideoStream->diagnostics.currentFrameRate =
                        EMA_ACCUMULATOR_GET_NEXT(pKinesisVideoStream->diagnostics.currentFrameRate, frameRate);
                }

                // Update elementaryFrameRate.
                deltaInSeconds = (DOUBLE)(pFrame->presentationTs - pKinesisVideoStream->diagnostics.previousFrameRatePts) / HUNDREDS_OF_NANOS_IN_A_SECOND;
                if(deltaInSeconds != 0) {
                    pKinesisVideoStream->diagnostics.elementaryFrameRate = 1/deltaInSeconds;
                }
            }
            // For first putFrame call, we only store the Pts and not perform any computation
            pKinesisVideoStream->diagnostics.previousFrameRatePts = pFrame->presentationTs;
            // Store the last frame timestamp
            pKinesisVideoStream->diagnostics.lastFrameRateTimestamp = currentTime;
        }
    }

    // Only update the timestamp on success
    if (pKinesisVideoStream->eofrFrame) {
        pKinesisVideoStream->lastPutFrameTimestamp = INVALID_TIMESTAMP_VALUE;
    } else {
        pKinesisVideoStream->lastPutFrameTimestamp = currentTime;
    }
    // Unlock the client as we no longer need it locked
    pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    clientLocked = FALSE;

    // Unlock the stream (even though it will be unlocked in  cleanup)
    pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    streamLocked = FALSE;

CleanUp:

    // We need to see whether we need to remove the allocation on error. Otherwise, we will leak
    if (STATUS_FAILED(retStatus) && IS_VALID_ALLOCATION_HANDLE(allocHandle) && freeOnError) {
        // Lock the client if it's not locked
        if (!clientLocked) {
            pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
            clientLocked = TRUE;
        }

        // Free the actual allocation as we will leak otherwise.
        heapFree(pKinesisVideoClient->pHeap, allocHandle);
    }

    if (clientLocked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    }

    if (streamLocked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    }

    LEAVES();
    return retStatus;
}