in src/client/src/Stream.c [725:1120]
STATUS putFrame(PKinesisVideoStream pKinesisVideoStream, PFrame pFrame)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKinesisVideoClient pKinesisVideoClient = NULL;
ALLOCATION_HANDLE allocHandle = INVALID_ALLOCATION_HANDLE_VALUE;
UINT64 remainingSize = 0, remainingDuration = 0, thresholdPercent = 0, duration = 0, viewByteSize = 0, allocSize = 0;
PBYTE pAlloc = NULL;
UINT32 trackIndex, packagedSize = 0, packagedMetadataSize = 0, overallSize = 0, itemFlags = ITEM_FLAG_NONE;
BOOL streamLocked = FALSE, clientLocked = FALSE, freeOnError = TRUE;
EncodedFrameInfo encodedFrameInfo;
MKV_STREAM_STATE generatorState = MKV_STATE_START_BLOCK;
UINT64 currentTime = INVALID_TIMESTAMP_VALUE;
DOUBLE frameRate, deltaInSeconds;
PViewItem pViewItem = NULL;
PUploadHandleInfo pUploadHandleInfo;
UINT64 windowDuration, currentDuration;
PTrackInfo pTrackInfo = NULL;
CHK(pKinesisVideoStream != NULL && pFrame != NULL, STATUS_NULL_ARG);
pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;
if (!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
// Lookup the track that pFrame belongs to
CHK_STATUS(mkvgenGetTrackInfo(pKinesisVideoStream->streamInfo.streamCaps.trackInfoList,
pKinesisVideoStream->streamInfo.streamCaps.trackInfoCount, pFrame->trackId, &pTrackInfo, &trackIndex));
}
// Check if the stream has been stopped
CHK(!pKinesisVideoStream->streamStopped, STATUS_STREAM_HAS_BEEN_STOPPED);
// Lock the stream
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
streamLocked = TRUE;
fixupFrame(pFrame);
// Set the last PutFrame time to current time
currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
if (pKinesisVideoStream->eofrFrame) {
// After EOFR we again need to skip non-key frames before starting up new fragment
pKinesisVideoStream->skipNonKeyFrames = TRUE;
// Check we're not seeing an EOFR frame immediately after an EOFR frame
CHK(!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags), STATUS_MULTIPLE_CONSECUTIVE_EOFR);
}
// Validate that we are not seeing EoFr explicit marker in a non-key-frame fragmented stream
if (!pKinesisVideoStream->streamInfo.streamCaps.keyFrameFragmentation) {
CHK(!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags), STATUS_END_OF_FRAGMENT_FRAME_INVALID_STATE);
}
// Need to check whether the streaming token has expired.
// If we have the streaming token and it's in the grace period
// then we need to go back to the get streaming end point and
// get the streaming end point and the new streaming token.
CHK_STATUS(checkStreamingTokenExpiration(pKinesisVideoStream));
// Check if we have passed the delay for resetting generator.
if (IS_VALID_TIMESTAMP(pKinesisVideoStream->resetGeneratorTime)) {
currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
if (currentTime >= pKinesisVideoStream->resetGeneratorTime) {
pKinesisVideoStream->resetGeneratorTime = INVALID_TIMESTAMP_VALUE;
pKinesisVideoStream->resetGeneratorOnKeyFrame = TRUE;
}
}
// NOTE: If the connection has been reset we need to start from a new header
if (pKinesisVideoStream->streamState == STREAM_STATE_NEW && pKinesisVideoStream->streamReady) {
// Step the state machine once to get out of the Ready state
CHK_STATUS(stepStateMachine(pKinesisVideoStream->base.pStateMachine));
}
// if we need to reset the generator on the next key frame (during the rotation only)
if (pKinesisVideoStream->resetGeneratorOnKeyFrame && CHECK_FRAME_FLAG_KEY_FRAME(pFrame->flags)) {
CHK_STATUS(mkvgenResetGenerator(pKinesisVideoStream->pMkvGenerator));
pKinesisVideoStream->resetGeneratorOnKeyFrame = FALSE;
}
// Check if the frames should be skipped and whether the skip flag needs to be reset
// If current frame is EOFR, We will package EOFR frame and we do not need to reset skip flag
if (pKinesisVideoStream->skipNonKeyFrames) {
if (CHECK_FRAME_FLAG_KEY_FRAME(pFrame->flags)) {
pKinesisVideoStream->skipNonKeyFrames = FALSE;
} else if (!CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
pKinesisVideoStream->diagnostics.skippedFrames++;
CHK(FALSE, retStatus); // skip processing the frame
}
}
// Package and store the frame.
// If the frame is a special End-of-Fragment indicator
// then we need to package the not yet sent metadata with EoFr metadata
// and indicate the new cluster boundary
// Currently, we are storing every frame in a separate allocation
// We will need to optimize for storing perhaps an entire cluster but
// Not right now as we need to deal with mapping/unmapping and allocating
// the cluster storage up-front which might be more hassle than needed.
if (CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
// We will append the EoFr tag and package the tags
CHK_STATUS(appendValidatedMetadata(pKinesisVideoStream, (PCHAR) EOFR_METADATA_NAME, (PCHAR) "", FALSE, pKinesisVideoStream->eosTracker.size));
// Package the not-applied metadata as the frame bits
CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, TRUE, NULL, &packagedSize));
} else {
// Get the size of the packaged frame
CHK_STATUS(mkvgenPackageFrame(pKinesisVideoStream->pMkvGenerator, pFrame, pTrackInfo, NULL, &packagedSize, &encodedFrameInfo));
// Preserve the current stream state as it might change after we apply the metadata
generatorState = encodedFrameInfo.streamState;
// Need to handle the metadata if any.
// NOTE: The accumulated metadata will be inserted before the fragment start due to MKV limitation
// as tags are level 1 and will break MKV clusters.
if (generatorState == MKV_STATE_START_STREAM || generatorState == MKV_STATE_START_CLUSTER) {
// Calculate the size of the metadata first
CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, FALSE, NULL, &packagedMetadataSize));
}
}
// Overall frame allocation size
overallSize = packagedSize + packagedMetadataSize;
pKinesisVideoStream->maxFrameSizeSeen = MAX(pKinesisVideoStream->maxFrameSizeSeen, overallSize);
// Might need to block on the availability in the OFFLINE mode
CHK_STATUS(handleAvailability(pKinesisVideoStream, overallSize, &allocHandle));
if (IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
// offline streaming mode can block so we need to reset the currentTime just in case
currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
}
// Lock the client
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = TRUE;
// Ensure we have space and if not then bail
CHK(IS_VALID_ALLOCATION_HANDLE(allocHandle), STATUS_STORE_OUT_OF_MEMORY);
// Map the storage
CHK_STATUS(heapMap(pKinesisVideoClient->pHeap, allocHandle, (PVOID*) &pAlloc, &allocSize));
// Validate we had allocated enough storage just in case
CHK(overallSize <= allocSize, STATUS_ALLOCATION_SIZE_SMALLER_THAN_REQUESTED);
// Check if we are packaging special EoFr
if (CHECK_FRAME_FLAG_END_OF_FRAGMENT(pFrame->flags)) {
// Store the metadata at the beginning of the allocation
CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, TRUE, pAlloc, &packagedSize));
// Synthesize the encodedFrameInfo
CHK_STATUS(mkvgenGetCurrentTimestamps(pKinesisVideoStream->pMkvGenerator, &encodedFrameInfo.streamStartTs, &encodedFrameInfo.clusterPts,
&encodedFrameInfo.clusterDts));
encodedFrameInfo.dataOffset = 0;
encodedFrameInfo.streamState = MKV_STATE_START_BLOCK;
// Pre-set the item flags to differentiate EoFr in EoS case
SET_ITEM_FRAGMENT_END(itemFlags);
// Synthesize the frame timestamps based on the last content view entry
CHK_STATUS(contentViewGetHead(pKinesisVideoStream->pView, &pViewItem));
encodedFrameInfo.frameDts = pViewItem->timestamp + pViewItem->duration - encodedFrameInfo.clusterDts;
encodedFrameInfo.framePts = pViewItem->ackTimestamp + pViewItem->duration - encodedFrameInfo.clusterPts;
encodedFrameInfo.duration = 0;
// Set the EoFr flag so we won't append not-sent metadata/EOS on StreamStop
pKinesisVideoStream->eofrFrame = TRUE;
} else {
// Actually package the bits in the storage
CHK_STATUS(mkvgenPackageFrame(pKinesisVideoStream->pMkvGenerator, pFrame, pTrackInfo, pAlloc, &packagedSize, &encodedFrameInfo));
// Package the metadata if specified
if (packagedMetadataSize != 0) {
// Move the packaged bits out first to make room for the metadata
// NOTE: need to use MEMMOVE due to the overlapping ranges
MEMMOVE(pAlloc + encodedFrameInfo.dataOffset + packagedMetadataSize, pAlloc + encodedFrameInfo.dataOffset,
packagedSize - encodedFrameInfo.dataOffset);
// Metadata will be packaged after the MKV header but before the cluster
CHK_STATUS(packageStreamMetadata(pKinesisVideoStream, MKV_STATE_START_CLUSTER, FALSE, pAlloc + encodedFrameInfo.dataOffset,
&packagedMetadataSize));
}
}
// Unmap the storage for the frame
CHK_STATUS(heapUnmap(pKinesisVideoClient->pHeap, ((PVOID) pAlloc)));
// Check for storage pressures. No need for offline mode as the media pipeline will be blocked when there
// is not enough storage
if (!IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
remainingSize = pKinesisVideoClient->pHeap->heapLimit - pKinesisVideoClient->pHeap->heapSize;
thresholdPercent = (UINT32)(((DOUBLE) remainingSize / pKinesisVideoClient->pHeap->heapLimit) * 100);
if (thresholdPercent <= STORAGE_PRESSURE_NOTIFICATION_THRESHOLD) {
pKinesisVideoStream->diagnostics.storagePressures++;
if (pKinesisVideoClient->clientCallbacks.storageOverflowPressureFn != NULL) {
// Notify the client app about buffer pressure
CHK_STATUS(
pKinesisVideoClient->clientCallbacks.storageOverflowPressureFn(pKinesisVideoClient->clientCallbacks.customData, remainingSize));
}
}
// No need to report buffer duration overflow in offline since the putFrame thread will be blocked.
// Only report buffer duration overflow if retention is non-zero, since if retention is zero, there will be no
// persisted ack and buffer will drop off tail all the time.
if (pKinesisVideoStream->streamInfo.retention != RETENTION_PERIOD_SENTINEL &&
pKinesisVideoClient->clientCallbacks.bufferDurationOverflowPressureFn != NULL) {
CHK_STATUS(contentViewGetWindowDuration(pKinesisVideoStream->pView, ¤tDuration, &windowDuration));
// Check for buffer duration pressure. Note that streamCaps.bufferDuration will never be 0.
remainingDuration = pKinesisVideoStream->streamInfo.streamCaps.bufferDuration - windowDuration;
thresholdPercent = (UINT32)(((DOUBLE) remainingDuration / pKinesisVideoStream->streamInfo.streamCaps.bufferDuration) * 100);
if (thresholdPercent <= BUFFER_DURATION_PRESSURE_NOTIFICATION_THRESHOLD) {
pKinesisVideoStream->diagnostics.bufferPressures++;
// Notify the client app about buffer pressure
CHK_STATUS(pKinesisVideoClient->clientCallbacks.bufferDurationOverflowPressureFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), remainingDuration));
}
}
}
// Generate the view flags
switch (generatorState) {
case MKV_STATE_START_STREAM:
SET_ITEM_STREAM_START(itemFlags);
SET_ITEM_STREAM_START_DEBUG(itemFlags);
// fall-through
case MKV_STATE_START_CLUSTER:
SET_ITEM_FRAGMENT_START(itemFlags);
// Clear the EoFr flag
pKinesisVideoStream->eofrFrame = FALSE;
break;
case MKV_STATE_START_BLOCK:
break;
}
if (CHECK_ITEM_FRAGMENT_START(itemFlags) && pKinesisVideoClient->deviceInfo.clientInfo.logMetric) {
currentTime = IS_VALID_TIMESTAMP(currentTime)
? currentTime
: pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
if (currentTime >= pKinesisVideoStream->diagnostics.nextLoggingTime) {
// unlock the client
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = FALSE;
// unlock the stream
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
streamLocked = FALSE;
if (STATUS_FAILED(logStreamMetric(pKinesisVideoStream))) {
DLOGW("Failed to log stream metric with error 0x%08x", retStatus);
}
// lock the stream
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
streamLocked = TRUE;
// lock the client
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = TRUE;
}
// Update the next log time
pKinesisVideoStream->diagnostics.nextLoggingTime = currentTime + pKinesisVideoClient->deviceInfo.clientInfo.metricLoggingPeriod;
}
// Put the frame into the view.
// NOTE: For the timestamp we will specify the cluster timestamp + frame timestamp which
// will be useful later to find the start of the fragment corresponding to the ACK timecode.
// We will also use the a non simple block state as a fragment start indicator - an IDR frame.
// NOTE: We are using the decoding timestamp as our item timestamp.
// Although, the intra-cluster pts might vary from dts, the cluster start frame pts
// will be equal to dts which is important when processing ACKs.
// NOTE: The size of the actual data might be smaller than the allocated size due to the fact that
// calculating the packaged metadata and frame might have preserved the MKV header twice as the calculation
// of the size doesn't mutate the state of the generator. We will use the actual packaged size.
CHK_STATUS(contentViewAddItem(pKinesisVideoStream->pView, encodedFrameInfo.clusterDts + encodedFrameInfo.frameDts,
encodedFrameInfo.clusterPts + encodedFrameInfo.framePts, encodedFrameInfo.duration, allocHandle,
encodedFrameInfo.dataOffset, overallSize, itemFlags));
// From now on we don't need to free the allocation as it's in the view already and will be collected
freeOnError = FALSE;
if (CHECK_ITEM_STREAM_START(itemFlags)) {
// Store the stream start timestamp for ACK timecode adjustment for relative cluster timecode streams
pKinesisVideoStream->newSessionTimestamp = encodedFrameInfo.streamStartTs;
CHK_STATUS(contentViewGetHead(pKinesisVideoStream->pView, &pViewItem));
pKinesisVideoStream->newSessionIndex = pViewItem->index;
}
// We need to check for the latency pressure. If the view head is ahead of the current
// for more than the specified max latency then we need to call the optional user callback.
// NOTE: A special sentinel value is used to determine whether the latency is specified.
if (pKinesisVideoStream->streamInfo.streamCaps.maxLatency != STREAM_LATENCY_PRESSURE_CHECK_SENTINEL &&
pKinesisVideoClient->clientCallbacks.streamLatencyPressureFn != NULL) {
// Get the window duration from the view
CHK_STATUS(contentViewGetWindowDuration(pKinesisVideoStream->pView, &duration, NULL));
// Check for the breach and invoke the user provided callback
if (duration > pKinesisVideoStream->streamInfo.streamCaps.maxLatency) {
pKinesisVideoStream->diagnostics.latencyPressures++;
CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamLatencyPressureFn(pKinesisVideoClient->clientCallbacks.customData,
TO_STREAM_HANDLE(pKinesisVideoStream), duration));
}
}
// Notify about data is available
pUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_READY | UPLOAD_HANDLE_STATE_STREAMING);
if (NULL != pUploadHandleInfo && IS_VALID_UPLOAD_HANDLE(pUploadHandleInfo->handle)) {
// Get the duration and the size
CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));
// Call the notification callback
CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
pUploadHandleInfo->handle, duration, viewByteSize));
}
// Recalculate frame rate if enabled
if (pKinesisVideoStream->streamInfo.streamCaps.recalculateMetrics) {
// Calculate the current frame rate only after the first iteration
currentTime = IS_VALID_TIMESTAMP(currentTime)
? currentTime
: pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
if (pTrackInfo != NULL && pTrackInfo->trackType == MKV_TRACK_INFO_TYPE_VIDEO) {
if (!CHECK_ITEM_STREAM_START(itemFlags)) {
// Calculate the delta time in seconds
deltaInSeconds = (DOUBLE)(currentTime - pKinesisVideoStream->diagnostics.lastFrameRateTimestamp) / HUNDREDS_OF_NANOS_IN_A_SECOND;
if (deltaInSeconds != 0) {
frameRate = 1 / deltaInSeconds;
// Update the current frame rate
pKinesisVideoStream->diagnostics.currentFrameRate =
EMA_ACCUMULATOR_GET_NEXT(pKinesisVideoStream->diagnostics.currentFrameRate, frameRate);
}
// Update elementaryFrameRate.
deltaInSeconds = (DOUBLE)(pFrame->presentationTs - pKinesisVideoStream->diagnostics.previousFrameRatePts) / HUNDREDS_OF_NANOS_IN_A_SECOND;
if(deltaInSeconds != 0) {
pKinesisVideoStream->diagnostics.elementaryFrameRate = 1/deltaInSeconds;
}
}
// For first putFrame call, we only store the Pts and not perform any computation
pKinesisVideoStream->diagnostics.previousFrameRatePts = pFrame->presentationTs;
// Store the last frame timestamp
pKinesisVideoStream->diagnostics.lastFrameRateTimestamp = currentTime;
}
}
// Only update the timestamp on success
if (pKinesisVideoStream->eofrFrame) {
pKinesisVideoStream->lastPutFrameTimestamp = INVALID_TIMESTAMP_VALUE;
} else {
pKinesisVideoStream->lastPutFrameTimestamp = currentTime;
}
// Unlock the client as we no longer need it locked
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = FALSE;
// Unlock the stream (even though it will be unlocked in cleanup)
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
streamLocked = FALSE;
CleanUp:
// We need to see whether we need to remove the allocation on error. Otherwise, we will leak
if (STATUS_FAILED(retStatus) && IS_VALID_ALLOCATION_HANDLE(allocHandle) && freeOnError) {
// Lock the client if it's not locked
if (!clientLocked) {
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = TRUE;
}
// Free the actual allocation as we will leak otherwise.
heapFree(pKinesisVideoClient->pHeap, allocHandle);
}
if (clientLocked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
}
if (streamLocked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
}
LEAVES();
return retStatus;
}