in src/client/src/StreamEvent.c [784:914]
STATUS streamFragmentAckEvent(PKinesisVideoStream pKinesisVideoStream, UPLOAD_HANDLE uploadHandle, PFragmentAck pFragmentAck)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKinesisVideoClient pKinesisVideoClient = NULL;
PUploadHandleInfo pUploadHandleInfo;
BOOL locked = FALSE, inView = FALSE;
UINT64 timestamp = 0, errorSkipStart, curIndex;
PViewItem pViewItem;
CHK(pKinesisVideoStream != NULL && pKinesisVideoStream->pKinesisVideoClient != NULL && pFragmentAck != NULL, STATUS_NULL_ARG);
// Check the version
CHK(pFragmentAck->version <= FRAGMENT_ACK_CURRENT_VERSION, STATUS_INVALID_FRAGMENT_ACK_VERSION);
// Early return if we have an IDLE ack
if (pFragmentAck->ackType == FRAGMENT_ACK_TYPE_IDLE) {
// Nothing to do if we have an IDLE ack
CHK(FALSE, retStatus);
}
pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;
// Lock the state
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
locked = TRUE;
// First of all, check if the ACK is for a session that's expired/closed already and ignore if it is
pUploadHandleInfo = getStreamUploadInfo(pKinesisVideoStream, uploadHandle);
if (NULL == pUploadHandleInfo ||
(pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_ERROR || pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_TERMINATED) ||
!IS_VALID_UPLOAD_HANDLE(pUploadHandleInfo->handle)) {
// No session is present - early return
DLOGW("An ACK is received for an already expired upload handle %" PRIu64, uploadHandle);
CHK(FALSE, retStatus);
}
// Fix-up the invalid timestamp
if (!IS_VALID_TIMESTAMP(pFragmentAck->timestamp)) {
// Use the current from the view.
// If the current is greater than head which is the case when the networking pulls fast enough
// then we will use the head instead.
CHK_STATUS(contentViewGetCurrentIndex(pKinesisVideoStream->pView, &curIndex));
CHK_STATUS(contentViewGetHead(pKinesisVideoStream->pView, &pViewItem));
if (curIndex < pViewItem->index) {
CHK_STATUS(contentViewGetItemAt(pKinesisVideoStream->pView, curIndex, &pViewItem));
}
timestamp = pViewItem->ackTimestamp;
// If we already have an persisted ACK, no need to go farther back
errorSkipStart =
IS_VALID_TIMESTAMP(pUploadHandleInfo->lastPersistedAckTs) ? pUploadHandleInfo->lastPersistedAckTs : pUploadHandleInfo->timestamp;
} else {
// Calculate the timestamp based on the ACK.
// Convert the timestamp
CHK_STATUS(mkvgenTimecodeToTimestamp(pKinesisVideoStream->pMkvGenerator, pFragmentAck->timestamp, ×tamp));
// In case we have a relative cluster timestamp stream we need to adjust for the stream start.
// The stream start timestamp is extracted from the stream session map.
if (!pKinesisVideoStream->streamInfo.streamCaps.absoluteFragmentTimes && IS_VALID_TIMESTAMP(pUploadHandleInfo->timestamp)) {
// Adjust the relative timestamp to make an absolute timestamp
timestamp += pUploadHandleInfo->timestamp;
}
// No skipping farther than the given timestamp
errorSkipStart = timestamp;
}
// Quick check if we have the timestamp in the view window and if not then bail out early
CHK_STATUS(contentViewTimestampInRange(pKinesisVideoStream->pView, timestamp, TRUE, &inView));
// NOTE: IMPORTANT: For the Error Ack case we will still need to process the ACK. The side-effect of
// processing the Error Ack is the connection termination which is needed as the higher-level clients like
// CURL might not trigger the termination after the LB sends 'FIN' TCP packet.
switch (pFragmentAck->ackType) {
case FRAGMENT_ACK_TYPE_BUFFERING:
if (inView) {
CHK_STATUS(streamFragmentBufferingAck(pKinesisVideoStream, timestamp));
}
break;
case FRAGMENT_ACK_TYPE_RECEIVED:
if (inView) {
CHK_STATUS(streamFragmentReceivedAck(pKinesisVideoStream, timestamp));
}
break;
case FRAGMENT_ACK_TYPE_PERSISTED:
if (inView) {
CHK_STATUS(streamFragmentPersistedAck(pKinesisVideoStream, timestamp, pUploadHandleInfo));
}
break;
case FRAGMENT_ACK_TYPE_ERROR:
if (!inView) {
// Apply to the earliest.
CHK_STATUS(contentViewGetTail(pKinesisVideoStream->pView, &pViewItem));
timestamp = pViewItem->ackTimestamp;
}
CHK_STATUS(streamFragmentErrorAck(pKinesisVideoStream, errorSkipStart, timestamp, pFragmentAck->result));
break;
default:
// shouldn't ever be the case.
CHK(FALSE, STATUS_INVALID_FRAGMENT_ACK_TYPE);
}
// Still return an error on an out-of-bounds timestamp
CHK(inView, STATUS_ACK_TIMESTAMP_NOT_IN_VIEW_WINDOW);
CleanUp:
if (pKinesisVideoClient != NULL) {
// We will notify the fragment ACK received callback even if the processing failed
if (pKinesisVideoClient->clientCallbacks.fragmentAckReceivedFn != NULL) {
pKinesisVideoClient->clientCallbacks.fragmentAckReceivedFn(pKinesisVideoClient->clientCallbacks.customData,
TO_STREAM_HANDLE(pKinesisVideoStream), uploadHandle, pFragmentAck);
}
// Unlock the stream
if (locked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
}
}
LEAVES();
return retStatus;
}