in src/client/src/StreamEvent.c [638:779]
STATUS streamTerminatedEvent(PKinesisVideoStream pKinesisVideoStream, UPLOAD_HANDLE uploadHandle, SERVICE_CALL_RESULT callResult,
BOOL connectionStillAlive)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PStateMachineState pState;
PKinesisVideoClient pKinesisVideoClient = NULL;
PUploadHandleInfo pUploadHandleInfo, pActiveUploadHandleInfo = NULL;
UINT32 sessionCount = 0, i = 0;
UINT64 item;
BOOL locked = FALSE, spawnNewUploadSession = TRUE, uploadHandleNotUsed = FALSE;
CHK(pKinesisVideoStream != NULL && pKinesisVideoStream->pKinesisVideoClient != NULL, STATUS_NULL_ARG);
pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;
// Lock the state
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
locked = TRUE;
// We should handle the in-grace termination differently by not setting the terminated state
if (SERVICE_CALL_STREAM_AUTH_IN_GRACE_PERIOD != callResult) {
// Set default to UPLOAD_CONNECTION_STATE_IN_USE which will trigger rollback
pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_IN_USE;
// If invalid upload handle is specified, terminated all uploading session, and let
// state machine spawn new session.
if (!IS_VALID_UPLOAD_HANDLE(uploadHandle)) {
CHK_STATUS(stackQueueGetCount(pKinesisVideoStream->pUploadInfoQueue, &sessionCount));
for (i = 0; i < sessionCount; i++) {
CHK_STATUS(stackQueueGetAt(pKinesisVideoStream->pUploadInfoQueue, i, &item));
pUploadHandleInfo = (PUploadHandleInfo) item;
CHK(pUploadHandleInfo != NULL, STATUS_INTERNAL_ERROR);
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
// pulse the upload handle to receive the terminated status. The assumption is that
// upper layer uploading session should not be dead and should call getStreamData
// after receiving streamDataAvailable callback.
CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
pUploadHandleInfo->handle, 0, 0));
}
} else {
pUploadHandleInfo = getStreamUploadInfo(pKinesisVideoStream, uploadHandle);
if (pUploadHandleInfo == NULL) {
DLOGW("streamTerminatedEvent called for unknown upload handle %" PRIu64, uploadHandle);
} else {
// If the upload handle has not streamed any data, we can safely ignore this event.
// else the upload handle has streamed some data, set flag to trigger rollback in the next getStreamData call.
uploadHandleNotUsed = IS_UPLOAD_HANDLE_IN_STATE(pUploadHandleInfo, UPLOAD_HANDLE_STATE_NOT_IN_USE);
// Set the state to terminated
pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;
if (uploadHandleNotUsed) {
// Need to indicate to the getStreamData to not rollback.
pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_NOT_IN_USE;
} else {
pActiveUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_ACTIVE);
if (pActiveUploadHandleInfo != NULL) {
// If the errored handle is the only handle, then rollback,
// otherwise do not rollback because the rollback will corrupt other active handle.
pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_NOT_IN_USE;
DLOGW("Last fragment with timestamp %" PRIu64 " for upload handle %" PRIu64 " might not be fully persisted",
pUploadHandleInfo->lastFragmentTs, uploadHandle);
if (pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_AWAITING_ACK &&
pKinesisVideoClient->clientCallbacks.streamErrorReportFn != NULL) {
pKinesisVideoClient->clientCallbacks.streamErrorReportFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), uploadHandle,
pUploadHandleInfo->lastFragmentTs, STATUS_PUTMEDIA_LAST_PERSIST_ACK_NOT_RECEIVED);
}
}
}
// In case of reset connection and error acks, need to ping the terminated upload handle so that it
// can unpause if paused and then call getStreamData and receive the end-of-stream status
if (connectionStillAlive) {
pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(pKinesisVideoClient->clientCallbacks.customData,
TO_STREAM_HANDLE(pKinesisVideoStream),
pKinesisVideoStream->streamInfo.name, pUploadHandleInfo->handle, 0, 0);
} else {
// If the connection that upload handle represents is already dead. Then it will not make anymore
// getStreamData call so it should be removed.
deleteStreamUploadInfo(pKinesisVideoStream, pUploadHandleInfo);
pUploadHandleInfo = NULL;
}
// signal the next active session that it can start getting stream data.
pActiveUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_ACTIVE);
if (pActiveUploadHandleInfo != NULL) {
// dont spawn new session since we already have a active one
spawnNewUploadSession = FALSE;
pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
pActiveUploadHandleInfo->handle, 0, 0);
}
}
}
}
// if we had an auth failure then we will not exit early we need to retry auth
if (pKinesisVideoClient->base.result != SERVICE_CALL_AUTH_FAILURE) {
// return early if pic is already in process of spawning new uploading session
CHK(!STATUS_SUCCEEDED(acceptStateMachineState(pKinesisVideoStream->base.pStateMachine,
STREAM_STATE_DESCRIBE | STREAM_STATE_CREATE | STREAM_STATE_TAG_STREAM | STREAM_STATE_GET_TOKEN |
STREAM_STATE_GET_ENDPOINT | STREAM_STATE_READY)),
retStatus);
}
if (spawnNewUploadSession) {
// Stop the stream
pKinesisVideoStream->streamState = STREAM_STATE_STOPPED;
// Get the accepted state
CHK_STATUS(getStateMachineState(pKinesisVideoStream->base.pStateMachine, STREAM_STATE_STOPPED, &pState));
// Check for the right state
CHK_STATUS(acceptStateMachineState(pKinesisVideoStream->base.pStateMachine, pState->acceptStates));
// store the result
pKinesisVideoStream->base.result = callResult;
// Step the machine
CHK_STATUS(stepStateMachine(pKinesisVideoStream->base.pStateMachine));
}
CleanUp:
// Unlock the stream
if (locked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
}
LEAVES();
return retStatus;
}