STATUS streamTerminatedEvent()

in src/client/src/StreamEvent.c [638:779]


STATUS streamTerminatedEvent(PKinesisVideoStream pKinesisVideoStream, UPLOAD_HANDLE uploadHandle, SERVICE_CALL_RESULT callResult,
                             BOOL connectionStillAlive)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PStateMachineState pState;
    PKinesisVideoClient pKinesisVideoClient = NULL;
    PUploadHandleInfo pUploadHandleInfo, pActiveUploadHandleInfo = NULL;
    UINT32 sessionCount = 0, i = 0;
    UINT64 item;
    BOOL locked = FALSE, spawnNewUploadSession = TRUE, uploadHandleNotUsed = FALSE;

    CHK(pKinesisVideoStream != NULL && pKinesisVideoStream->pKinesisVideoClient != NULL, STATUS_NULL_ARG);
    pKinesisVideoClient = pKinesisVideoStream->pKinesisVideoClient;

    // Lock the state
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    locked = TRUE;

    // We should handle the in-grace termination differently by not setting the terminated state
    if (SERVICE_CALL_STREAM_AUTH_IN_GRACE_PERIOD != callResult) {
        // Set default to UPLOAD_CONNECTION_STATE_IN_USE which will trigger rollback
        pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_IN_USE;

        // If invalid upload handle is specified, terminated all uploading session, and let
        // state machine spawn new session.
        if (!IS_VALID_UPLOAD_HANDLE(uploadHandle)) {
            CHK_STATUS(stackQueueGetCount(pKinesisVideoStream->pUploadInfoQueue, &sessionCount));

            for (i = 0; i < sessionCount; i++) {
                CHK_STATUS(stackQueueGetAt(pKinesisVideoStream->pUploadInfoQueue, i, &item));
                pUploadHandleInfo = (PUploadHandleInfo) item;
                CHK(pUploadHandleInfo != NULL, STATUS_INTERNAL_ERROR);

                pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;

                // pulse the upload handle to receive the terminated status. The assumption is that
                // upper layer uploading session should not be dead and should call getStreamData
                // after receiving streamDataAvailable callback.
                CHK_STATUS(pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
                    pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
                    pUploadHandleInfo->handle, 0, 0));
            }
        } else {
            pUploadHandleInfo = getStreamUploadInfo(pKinesisVideoStream, uploadHandle);

            if (pUploadHandleInfo == NULL) {
                DLOGW("streamTerminatedEvent called for unknown upload handle %" PRIu64, uploadHandle);
            } else {
                // If the upload handle has not streamed any data, we can safely ignore this event.
                // else the upload handle has streamed some data, set flag to trigger rollback in the next getStreamData call.
                uploadHandleNotUsed = IS_UPLOAD_HANDLE_IN_STATE(pUploadHandleInfo, UPLOAD_HANDLE_STATE_NOT_IN_USE);

                // Set the state to terminated
                pUploadHandleInfo->state = UPLOAD_HANDLE_STATE_TERMINATED;

                if (uploadHandleNotUsed) {
                    // Need to indicate to the getStreamData to not rollback.
                    pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_NOT_IN_USE;
                } else {
                    pActiveUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_ACTIVE);

                    if (pActiveUploadHandleInfo != NULL) {
                        // If the errored handle is the only handle, then rollback,
                        // otherwise do not rollback because the rollback will corrupt other active handle.
                        pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_NOT_IN_USE;

                        DLOGW("Last fragment with timestamp %" PRIu64 " for upload handle %" PRIu64 " might not be fully persisted",
                              pUploadHandleInfo->lastFragmentTs, uploadHandle);

                        if (pUploadHandleInfo->state == UPLOAD_HANDLE_STATE_AWAITING_ACK &&
                            pKinesisVideoClient->clientCallbacks.streamErrorReportFn != NULL) {
                            pKinesisVideoClient->clientCallbacks.streamErrorReportFn(
                                pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), uploadHandle,
                                pUploadHandleInfo->lastFragmentTs, STATUS_PUTMEDIA_LAST_PERSIST_ACK_NOT_RECEIVED);
                        }
                    }
                }

                // In case of reset connection and error acks, need to ping the terminated upload handle so that it
                // can unpause if paused and then call getStreamData and receive the end-of-stream status
                if (connectionStillAlive) {
                    pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(pKinesisVideoClient->clientCallbacks.customData,
                                                                               TO_STREAM_HANDLE(pKinesisVideoStream),
                                                                               pKinesisVideoStream->streamInfo.name, pUploadHandleInfo->handle, 0, 0);
                } else {
                    // If the connection that upload handle represents is already dead. Then it will not make anymore
                    // getStreamData call so it should be removed.
                    deleteStreamUploadInfo(pKinesisVideoStream, pUploadHandleInfo);
                    pUploadHandleInfo = NULL;
                }

                // signal the next active session that it can start getting stream data.
                pActiveUploadHandleInfo = getStreamUploadInfoWithState(pKinesisVideoStream, UPLOAD_HANDLE_STATE_ACTIVE);

                if (pActiveUploadHandleInfo != NULL) {
                    // dont spawn new session since we already have a active one
                    spawnNewUploadSession = FALSE;
                    pKinesisVideoClient->clientCallbacks.streamDataAvailableFn(
                        pKinesisVideoClient->clientCallbacks.customData, TO_STREAM_HANDLE(pKinesisVideoStream), pKinesisVideoStream->streamInfo.name,
                        pActiveUploadHandleInfo->handle, 0, 0);
                }
            }
        }
    }

    // if we had an auth failure then we will not exit early we need to retry auth
    if (pKinesisVideoClient->base.result != SERVICE_CALL_AUTH_FAILURE) {
        // return early if pic is already in process of spawning new uploading session
        CHK(!STATUS_SUCCEEDED(acceptStateMachineState(pKinesisVideoStream->base.pStateMachine,
                                                      STREAM_STATE_DESCRIBE | STREAM_STATE_CREATE | STREAM_STATE_TAG_STREAM | STREAM_STATE_GET_TOKEN |
                                                          STREAM_STATE_GET_ENDPOINT | STREAM_STATE_READY)),
            retStatus);
    }

    if (spawnNewUploadSession) {
        // Stop the stream
        pKinesisVideoStream->streamState = STREAM_STATE_STOPPED;

        // Get the accepted state
        CHK_STATUS(getStateMachineState(pKinesisVideoStream->base.pStateMachine, STREAM_STATE_STOPPED, &pState));

        // Check for the right state
        CHK_STATUS(acceptStateMachineState(pKinesisVideoStream->base.pStateMachine, pState->acceptStates));

        // store the result
        pKinesisVideoStream->base.result = callResult;

        // Step the machine
        CHK_STATUS(stepStateMachine(pKinesisVideoStream->base.pStateMachine));
    }

CleanUp:

    // Unlock the stream
    if (locked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoStream->base.lock);
    }

    LEAVES();
    return retStatus;
}