in src/client/src/Stream.c [1125:1561]
STATUS getStreamData(PKinesisVideoStream pKinesisVideoStream, UPLOAD_HANDLE uploadHandle, PBYTE pBuffer, UINT32 bufferSize, PUINT32 pFillSize)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS, stalenessCheckStatus = STATUS_SUCCESS;
PKinesisVideoClient pKinesisVideoClient = NULL;
PViewItem pViewItem = NULL;
PBYTE pAlloc = NULL;
UINT32 size = 0, remainingSize = bufferSize, uploadHandleCount;
UINT64 allocSize, currentTime, duration, viewByteSize;
PBYTE pCurPnt = pBuffer;
BOOL streamLocked = FALSE, clientLocked = FALSE, rollbackToLastAck, restarted = FALSE, eosSent = FALSE;
DOUBLE transferRate, deltaInSeconds;
PUploadHandleInfo pUploadHandleInfo = NULL, pNextUploadHandleInfo = NULL;
CHK(pKinesisVideoStream != NULL && pKinesisVideoStream->pKinesisVideoClient != NULL && pBuffer != NULL && pFillSize != NULL, STATUS_NULL_ARG);
CHK(bufferSize != 0 && IS_VALID_UPLOAD_HANDLE(uploadHandle), STATUS_INVALID_ARG);
pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;
// Lock the stream
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
streamLocked = TRUE;
// If the state of the connection is IN_USE
// and we are not in grace period
// and we are not in a retry state on rotation
// then we need to rollback the current view pointer
if ((pKinesisVideoStream->connectionState & UPLOAD_CONNECTION_STATE_IN_USE) != UPLOAD_CONNECTION_STATE_NONE) {
if (IS_OFFLINE_STREAMING_MODE(pKinesisVideoStream->streamInfo.streamCaps.streamingType)) {
// In case of offline mode, we just need to set the current to the tail.
CHK_STATUS(contentViewGetTail(pKinesisVideoStream->pView, &pViewItem));
CHK_STATUS(contentViewSetCurrentIndex(pKinesisVideoStream->pView, pViewItem->index));
} else {
// Calculate the rollback duration based on the stream replay duration and the latest ACKs received.
// If the acks are not enabled or we have potentially dead host situation then we rollback to the replay duration.
// NOTE: The received ACK should be sequential and not out-of-order.
rollbackToLastAck = pKinesisVideoStream->streamInfo.streamCaps.fragmentAcks &&
CONNECTION_DROPPED_HOST_ALIVE(pKinesisVideoStream->connectionDroppedResult);
CHK_STATUS(contentViewRollbackCurrent(pKinesisVideoStream->pView, pKinesisVideoStream->streamInfo.streamCaps.replayDuration, TRUE,
rollbackToLastAck));
}
// Check if after the rolling back there is anything to load
// and early exit without changing the state if we don't have any
// Fix-up the current element
CHK_STATUS(streamStartFixupOnReconnect(pKinesisVideoStream));
restarted = TRUE;
}
// Reset the connection dropped indicator
pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_OK;
// Set the size first
*pFillSize = 0;
// Get the latest upload handle
pUploadHandleInfo = getStreamUploadInfo(pKinesisVideoStream, uploadHandle);
// Should indicate an abort for an invalid handle or handle that is not in the queue
CHK(pUploadHandleInfo != NULL, STATUS_UPLOAD_HANDLE_ABORTED);
switch (pUploadHandleInfo->state) {
case UPLOAD_HANDLE_STATE_READY:
// if we've created a new handle but has nothing to send
if (pKinesisVideoStream->streamStopped) {
// Get the duration and the size
CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));
if (viewByteSize == 0) {
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
CHK(FALSE, STATUS_END_OF_STREAM);
}
}
if (IS_VALID_TIMESTAMP(pKinesisVideoStream->newSessionTimestamp)) {
// set upload handle start timestamp to map ack timestamp to view item timestamp in case
// of using relative timestamp mode.
pUploadHandleInfo->timestamp = pKinesisVideoStream->newSessionTimestamp;
// Zero the sentinels
pKinesisVideoStream->newSessionTimestamp = INVALID_TIMESTAMP_VALUE;
pKinesisVideoStream->newSessionIndex = INVALID_VIEW_INDEX_VALUE;
} else if (!pKinesisVideoStream->streamInfo.streamCaps.absoluteFragmentTimes) {
// in relative time mode, upload handle timestamp is required.
CHK(FALSE, STATUS_NO_MORE_DATA_AVAILABLE);
}
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_STREAMING;
// Update the active session count in metrics
pKinesisVideoStream->diagnostics.totalActiveSessions++;
break;
case UPLOAD_HANDLE_STATE_AWAITING_ACK:
// handle case where new handle got last ack already
if (pUploadHandleInfo->lastPersistedAckTs == pUploadHandleInfo->lastFragmentTs) {
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
CHK(FALSE, STATUS_END_OF_STREAM);
}
CHK(FALSE, STATUS_AWAITING_PERSISTED_ACK);
case UPLOAD_HANDLE_STATE_ACK_RECEIVED:
// Set the state to terminated and early return with the EOS with the right handle
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
DLOGI("Indicating an EOS after last persisted ACK is received for stream upload handle %" PRIu64, uploadHandle);
CHK(FALSE, STATUS_END_OF_STREAM);
case UPLOAD_HANDLE_STATE_TERMINATED:
// This path get invoked if a connection get terminated by calling reset connection
DLOGW("Indicating an end-of-stream for a terminated stream upload handle %" PRIu64, uploadHandle);
CHK(FALSE, STATUS_END_OF_STREAM);
case UPLOAD_HANDLE_STATE_ERROR:
DLOGW("Indicating an abort for a errored stream upload handle %" PRIu64, uploadHandle);
CHK(FALSE, STATUS_UPLOAD_HANDLE_ABORTED);
default:
// no-op for other UPLOAD_HANDLE states
break;
}
// Continue filling the buffer from the point we left off
do {
// First and foremost, we need to check whether we need to
// send the packaged not-yet sent metadata, then
// we need to check whether we need to send the packaged EOS metadata
// Next, we need to check whether we have an allocation handle.
// This could happen in case when we start getting the data out or
// when the last time we got the data the item fell off the view window.
if (IS_UPLOAD_HANDLE_IN_SENDING_EOS_STATE(pUploadHandleInfo) && pKinesisVideoStream->metadataTracker.send) {
// Check if we have finished sending the packaged metadata and reset the values
if (pKinesisVideoStream->metadataTracker.offset == pKinesisVideoStream->metadataTracker.size) {
pKinesisVideoStream->metadataTracker.offset = 0;
pKinesisVideoStream->metadataTracker.send = FALSE;
// Set to send the EoS
pKinesisVideoStream->eosTracker.send = TRUE;
} else {
// Copy as much as we can
size = MIN(remainingSize, pKinesisVideoStream->metadataTracker.size - pKinesisVideoStream->metadataTracker.offset);
MEMCPY(pCurPnt, pKinesisVideoStream->metadataTracker.data + pKinesisVideoStream->metadataTracker.offset, size);
// Set the values for the next iteration.
pKinesisVideoStream->metadataTracker.offset += size;
pCurPnt += size;
remainingSize -= size;
*pFillSize += size;
}
} else if (IS_UPLOAD_HANDLE_IN_SENDING_EOS_STATE(pUploadHandleInfo) && pKinesisVideoStream->eosTracker.send) {
if (pKinesisVideoStream->eosTracker.offset == pKinesisVideoStream->eosTracker.size) {
pKinesisVideoStream->eosTracker.offset = 0;
pKinesisVideoStream->eosTracker.send = FALSE;
// Calculate the state for the upload handle based on whether we need to await for the last ACK
if (WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream)) {
if (IS_VALID_TIMESTAMP(pUploadHandleInfo->lastPersistedAckTs) &&
pUploadHandleInfo->lastPersistedAckTs == pUploadHandleInfo->lastFragmentTs) {
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
CHK(FALSE, STATUS_END_OF_STREAM);
}
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_AWAITING_ACK;
DLOGI("Handle %" PRIu64 " waiting for last persisted ack with ts %" PRIu64, pUploadHandleInfo->handle,
pUploadHandleInfo->lastFragmentTs);
// Will terminate later after the ACK is received
CHK(FALSE, STATUS_AWAITING_PERSISTED_ACK);
} else {
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
// The client should terminate and close the stream after finalizing any remaining transfer.
CHK(FALSE, STATUS_END_OF_STREAM);
}
}
// Copy as much as we can
size = MIN(remainingSize, pKinesisVideoStream->eosTracker.size - pKinesisVideoStream->eosTracker.offset);
MEMCPY(pCurPnt, pKinesisVideoStream->eosTracker.data + pKinesisVideoStream->eosTracker.offset, size);
// Set the values
pKinesisVideoStream->eosTracker.offset += size;
pCurPnt += size;
remainingSize -= size;
*pFillSize += size;
} else if (!IS_VALID_ALLOCATION_HANDLE(pKinesisVideoStream->curViewItem.viewItem.handle)) {
// Reset the current view item
pKinesisVideoStream->curViewItem.offset = pKinesisVideoStream->curViewItem.viewItem.length = 0;
// Fix-up the current item as it might be a stream start
CHK_STATUS(resetCurrentViewItemStreamStart(pKinesisVideoStream));
// Need to find the next key frame boundary in case of the current rolling out of the window
CHK_STATUS(getNextBoundaryViewItem(pKinesisVideoStream, &pViewItem));
if (pViewItem != NULL) {
// Reset the item ACK flags as this might be replay after rollback
CLEAR_ITEM_BUFFERING_ACK(pViewItem->flags);
CLEAR_ITEM_RECEIVED_ACK(pViewItem->flags);
// Use this as a current for the next iteration
pKinesisVideoStream->curViewItem.viewItem = *pViewItem;
} else {
// Couldn't find any boundary items, default to empty - early return
CHK(FALSE, STATUS_NO_MORE_DATA_AVAILABLE);
}
} else if (pKinesisVideoStream->curViewItem.offset == pKinesisVideoStream->curViewItem.viewItem.length) {
// Check if the current was a special EoFr in which case we don't need to send EoS
eosSent = CHECK_ITEM_FRAGMENT_END(pKinesisVideoStream->curViewItem.viewItem.flags) ? TRUE : FALSE;
// Fix-up the current item as it might be a stream start
CHK_STATUS(resetCurrentViewItemStreamStart(pKinesisVideoStream));
// Second, we need to check whether the existing view item has been exhausted
retStatus = getNextViewItem(pKinesisVideoStream, &pViewItem);
CHK(retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS || retStatus == STATUS_SUCCESS, retStatus);
// Special handling for the stopped stream.
if (retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS) {
// This requires user to call stop stream as soon as they are done.
// Need to kick off the EOS sequence
if (!pKinesisVideoStream->streamStopped) {
// Early return
CHK(FALSE, retStatus);
}
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATING;
pKinesisVideoStream->eosTracker.send = TRUE;
pKinesisVideoStream->eosTracker.offset = eosSent ? pKinesisVideoStream->eosTracker.size : 0;
// If we have eosSent then we can't append the last metadata afterwards as it will break the MKV cluster
if (eosSent) {
pKinesisVideoStream->metadataTracker.send = FALSE;
}
} else {
// Reset the item ACK flags as this might be replay after rollback
CLEAR_ITEM_BUFFERING_ACK(pViewItem->flags);
CLEAR_ITEM_RECEIVED_ACK(pViewItem->flags);
// Store it in the current view
pKinesisVideoStream->curViewItem.viewItem = *pViewItem;
pKinesisVideoStream->curViewItem.offset = 0;
// Check if we are finishing the previous stream and we have a boundary item
if (CHECK_ITEM_STREAM_START(pKinesisVideoStream->curViewItem.viewItem.flags)) {
// Set the stream handle as we have exhausted the previous
pUploadHandleInfo->state =
WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) ? UPLOAD_HANDLE_STATE_TERMINATING : UPLOAD_HANDLE_STATE_TERMINATED;
// Indicate to send EOS in the next iteration
pKinesisVideoStream->eosTracker.send = TRUE;
pKinesisVideoStream->eosTracker.offset = eosSent ? pKinesisVideoStream->eosTracker.size : 0;
// If we have eosSent then we can't append the last metadata afterwards as it will break the MKV cluster
if (eosSent) {
pKinesisVideoStream->metadataTracker.send = FALSE;
}
}
}
} else {
// Now, we can stream enough data out if we don't have a zero item
CHK(pKinesisVideoStream->curViewItem.offset != pKinesisVideoStream->curViewItem.viewItem.length, STATUS_NO_MORE_DATA_AVAILABLE);
// Set the key frame indicator on fragment or session start to track the persist ACKs
if (CHECK_ITEM_FRAGMENT_START(pKinesisVideoStream->curViewItem.viewItem.flags) ||
CHECK_ITEM_STREAM_START(pKinesisVideoStream->curViewItem.viewItem.flags)) {
pUploadHandleInfo->lastFragmentTs = pKinesisVideoStream->curViewItem.viewItem.ackTimestamp;
}
// Lock the client
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = TRUE;
// Fill the rest of the buffer of the current view item first
// Map the storage
CHK_STATUS(heapMap(pKinesisVideoClient->pHeap, pKinesisVideoStream->curViewItem.viewItem.handle, (PVOID*) &pAlloc, &allocSize));
CHK(allocSize < MAX_UINT32 && (UINT32) allocSize >= pKinesisVideoStream->curViewItem.viewItem.length, STATUS_INVALID_ALLOCATION_SIZE);
// Validate we had allocated enough storage just in case
CHK(pKinesisVideoStream->curViewItem.viewItem.length - pKinesisVideoStream->curViewItem.offset <= allocSize,
STATUS_VIEW_ITEM_SIZE_GREATER_THAN_ALLOCATION);
// Copy as much as we can
size = MIN(remainingSize, pKinesisVideoStream->curViewItem.viewItem.length - pKinesisVideoStream->curViewItem.offset);
MEMCPY(pCurPnt, pAlloc + pKinesisVideoStream->curViewItem.offset, size);
// Unmap the storage for the frame
CHK_STATUS(heapUnmap(pKinesisVideoClient->pHeap, ((PVOID) pAlloc)));
// unLock the client
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = FALSE;
// Set the values
pKinesisVideoStream->curViewItem.offset += size;
pCurPnt += size;
remainingSize -= size;
*pFillSize += size;
}
} while (remainingSize != 0);
CleanUp:
// Run staleness detection if we have ACKs enabled and if we have retrieved any data
if (pFillSize != NULL && *pFillSize != 0) {
stalenessCheckStatus = checkForConnectionStaleness(pKinesisVideoStream, &pKinesisVideoStream->curViewItem.viewItem);
}
if (retStatus == STATUS_CONTENT_VIEW_NO_MORE_ITEMS) {
// Replace it with a client side error
retStatus = STATUS_NO_MORE_DATA_AVAILABLE;
}
// Calculate the metrics for transfer rate on successful call and when we do have some data.
if (pKinesisVideoStream->streamInfo.streamCaps.recalculateMetrics && IS_VALID_GET_STREAM_DATA_STATUS(retStatus) && pFillSize != NULL &&
*pFillSize != 0) {
// Calculate the current transfer rate only after the first iteration
currentTime = pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
pKinesisVideoStream->diagnostics.accumulatedByteCount += *pFillSize;
// Store the overall transferred byte count
pKinesisVideoStream->diagnostics.transferredBytes += *pFillSize;
if (!restarted) {
// Calculate the delta time in seconds
deltaInSeconds = (DOUBLE)(currentTime - pKinesisVideoStream->diagnostics.lastTransferRateTimestamp) / HUNDREDS_OF_NANOS_IN_A_SECOND;
if (deltaInSeconds > TRANSFER_RATE_MEASURING_INTERVAL_EPSILON) {
transferRate = pKinesisVideoStream->diagnostics.accumulatedByteCount / deltaInSeconds;
// Update the current frame rate
pKinesisVideoStream->diagnostics.currentTransferRate =
(UINT64) EMA_ACCUMULATOR_GET_NEXT(pKinesisVideoStream->diagnostics.currentTransferRate, transferRate);
// Zero out for next time measuring
pKinesisVideoStream->diagnostics.accumulatedByteCount = 0;
// Store the last frame timestamp
pKinesisVideoStream->diagnostics.lastTransferRateTimestamp = currentTime;
}
} else {
// Store the last frame timestamp
pKinesisVideoStream->diagnostics.lastTransferRateTimestamp = currentTime;
}
}
// Special handling for stopped stream when the retention period is zero or no more data available
if (pKinesisVideoStream->streamStopped) {
// Trigger stream closed function when we don't need to wait for the persisted ack
// Or if we do need to wait for the ack and the state of the upload handler is terminated
if (retStatus == STATUS_END_OF_STREAM &&
(!WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) ||
(pUploadHandleInfo != NULL && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED))) {
// Get the duration and the size
CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));
// Get the number of non terminated handle.
CHK_STATUS(stackQueueGetCount(pKinesisVideoStream->pUploadInfoQueue, &uploadHandleCount));
// If there is no more data to send and current handle is the last one, wrap up by calling streamClosedFn.
if (viewByteSize == 0 && uploadHandleCount == 1) {
CHK_STATUS(notifyStreamClosed(pKinesisVideoStream, uploadHandle));
}
}
}
// when pUploadHandleInfo is in UPLOAD_HANDLE_STATE_TERMINATED or UPLOAD_HANDLE_STATE_AWAITING_ACK, it means current handle
// has sent all the data. So it's safe to unblock the next one if any. If WAIT_FOR_PERSISTED_ACK is enabled, no need
// to execute data available again when UPLOAD_HANDLE_STATE_TERMINATED because we did it already when upload handle
// was in UPLOAD_HANDLE_STATE_AWAITING_ACK state.
if (NULL != pUploadHandleInfo &&
(pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_AWAITING_ACK ||
(!WAIT_FOR_PERSISTED_ACK(pKinesisVideoStream) && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED))) {
// Special handling for the case:
// When the stream has been stopped so no more putFrame calls that drive the data availability
// When the current upload handle is in EoS
// When in offline mode, and the putFrame thread has been blocked so no more putFrame calls that drive
// the data availability
// When we still have ready upload handles which are awaiting
// We need to trigger their data ready callback to enable them
// Get the next upload handle that is ready to stream.
// Next upload handle could be in NEW state if there is a delay in putStreamResult
pNextUploadHandleInfo =
getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_NEW | UPLOAD_HANDLE_STATE_READY | UPLOAD_HANDLE_STATE_STREAMING);
// Notify the awaiting handle to enable it
if (pNextUploadHandleInfo != NULL) {
// Get the duration and the size
CHK_STATUS(getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize));
// If there no more item for the next handle to send and stream has stopped, set its state to terminated
// and poke the handle. The handle is supposed to terminate gracefully and poke the next handle if there is any.
if (viewByteSize == 0 && pKinesisVideoStream->streamStopped) {
pNextUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
}
CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
pNextUploadHandleInfo->handle, duration, viewByteSize));
}
}
// Remove the upload handle if it is in UPLOAD_HANDLE_STATE_TERMINATED state
if (NULL != pUploadHandleInfo && pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED) {
deleteStreamUploadInfo(pKinesisVideoStream, pUploadHandleInfo);
// if there is no more active upload handle and we still have bytes to transfer,
// call kinesisVideoStreamResetConnection to create new upload session.
pUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_ACTIVE);
if (pUploadHandleInfo == NULL) {
// Get the duration and the size
getAvailableViewSize(pKinesisVideoStream, &duration, &viewByteSize);
if (viewByteSize != 0) {
kinesisVideoStreamResetConnection(TO_STREAM_HANDLE(pKinesisVideoStream));
}
}
}
if (clientLocked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
}
if (streamLocked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
}
// Return the staleness check status if it's not a success
if (STATUS_FAILED(stalenessCheckStatus)) {
retStatus = stalenessCheckStatus;
}
LEAVES();
return retStatus;
}