STATUS createStream()

in src/client/src/Stream.c [18:321]


STATUS createStream(PKinesisVideoClient pKinesisVideoClient, PStreamInfo pStreamInfo, PKinesisVideoStream* ppKinesisVideoStream)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PKinesisVideoStream pKinesisVideoStream = NULL;
    PContentView pView = NULL;
    PStackQueue pStackQueue = NULL;
    PMkvGenerator pMkvGenerator = NULL;
    PStateMachine pStateMachine = NULL;
    UINT32 allocationSize, maxViewItems, i;
    PBYTE pCurPnt = NULL;
    BOOL clientLocked = FALSE, clientStreamsListLocked = FALSE;
    BOOL tearDownOnError = TRUE;
    CHAR tempStreamName[MAX_STREAM_NAME_LEN];
    UINT32 trackInfoSize, tagsSize;

    CHK(pKinesisVideoClient != NULL && ppKinesisVideoStream != NULL, STATUS_NULL_ARG);

    // Set the return stream pointer first
    *ppKinesisVideoStream = NULL;

    // Validate the input structs
    CHK_STATUS(validateStreamInfo(pStreamInfo, &pKinesisVideoClient->clientCallbacks));
    logStreamInfo(pStreamInfo);

    // Lock the client streams list lock because we will iterate over current streams + add more streams
    // We follow the principle that the streamListLock is *never* acquired inside a client or streams lock,
    // always outside to prevent deadlock
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.streamListLock);
    clientStreamsListLocked = TRUE;

    // Lock the client
    pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    clientLocked = TRUE;

    // Check for the stream count
    CHK(pKinesisVideoClient->streamCount < pKinesisVideoClient->deviceInfo.streamCount, STATUS_MAX_STREAM_COUNT);

    // Fix-up the stream name if not specified
    if (pStreamInfo->name[0] == '\0') {
        createRandomName(tempStreamName, DEFAULT_STREAM_NAME_LEN, pKinesisVideoClient->clientCallbacks.getRandomNumberFn,
                         pKinesisVideoClient->clientCallbacks.customData);
    } else {
        // Copy the stream name otherwise
        // NOTE: Stream name length has already been validated
        STRCPY(tempStreamName, pStreamInfo->name);
    }

    // Check if a stream by that name already exists
    for (i = 0; i < pKinesisVideoClient->deviceInfo.streamCount; i++) {
        if (NULL != pKinesisVideoClient->streams[i]) {
            CHK(0 != STRCMP(pKinesisVideoClient->streams[i]->streamInfo.name, tempStreamName), STATUS_DUPLICATE_STREAM_NAME);
        }
    }

    // Space for track info bits
    trackInfoSize = SIZEOF(TrackInfo) * pStreamInfo->streamCaps.trackInfoCount;

    // Get the max tags structure size
    CHK_STATUS(packageTags(pStreamInfo->tagCount, pStreamInfo->tags, 0, NULL, &tagsSize));

    // Allocate the main struct
    // NOTE: The calloc will Zero the fields
    // We are allocating more than the structure size to accommodate for
    // the variable size buffers which will come at the end and for the tags
    allocationSize = SIZEOF(KinesisVideoStream) + tagsSize + trackInfoSize + MKV_SEGMENT_UUID_LEN;
    pKinesisVideoStream = (PKinesisVideoStream) MEMCALLOC(1, allocationSize);
    CHK(pKinesisVideoStream != NULL, STATUS_NOT_ENOUGH_MEMORY);

    // Set the stream id first available slot so during the teardown it will clean it up.
    for (i = 0; i < pKinesisVideoClient->deviceInfo.streamCount; i++) {
        if (NULL == pKinesisVideoClient->streams[i]) {
            // Found an empty slot
            pKinesisVideoStream->streamId = i;
            break;
        }
    }

    // set the maximum frame size observed to 0
    pKinesisVideoStream->maxFrameSizeSeen = 0;

    // Set the back reference
    pKinesisVideoStream->pKinesisVideoClient = pKinesisVideoClient;

    // Set the basic info
    pKinesisVideoStream->base.identifier = KINESIS_VIDEO_OBJECT_IDENTIFIER_STREAM;
    pKinesisVideoStream->base.version = STREAM_CURRENT_VERSION;

    // Set the initial state and the stream status
    pKinesisVideoStream->streamState = STREAM_STATE_NONE;
    pKinesisVideoStream->streamStatus = STREAM_STATUS_CREATING;

    // Stream is not ready, not stopped and not closed
    pKinesisVideoStream->streamStopped = FALSE;
    pKinesisVideoStream->streamReady = FALSE;
    pKinesisVideoStream->streamClosed = FALSE;

    // Set the stream start timestamps and index
    pKinesisVideoStream->newSessionTimestamp = INVALID_TIMESTAMP_VALUE;
    pKinesisVideoStream->newSessionIndex = INVALID_VIEW_INDEX_VALUE;

    // Not in a grace period
    pKinesisVideoStream->gracePeriod = FALSE;

    // Shouldn't reset the generator on next key frame
    pKinesisVideoStream->resetGeneratorOnKeyFrame = FALSE;

    // Skip non-key frames at start
    pKinesisVideoStream->skipNonKeyFrames = TRUE;

    // Shouldn't reset the generator so set invalid time
    pKinesisVideoStream->resetGeneratorTime = INVALID_TIMESTAMP_VALUE;

    // No connections have been dropped as this is a new stream
    pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_OK;

    // Set the last frame EoFr indicator
    pKinesisVideoStream->eofrFrame = FALSE;

    // Set last PutFrame timestamp to invalid time value
    pKinesisVideoStream->lastPutFrameTimestamp = INVALID_TIMESTAMP_VALUE;

    // Set the initial diagnostics information from the defaults
    pKinesisVideoStream->diagnostics.currentFrameRate = pStreamInfo->streamCaps.frameRate;
    pKinesisVideoStream->diagnostics.elementaryFrameRate = pStreamInfo->streamCaps.frameRate;
    pKinesisVideoStream->diagnostics.currentTransferRate = pStreamInfo->streamCaps.avgBandwidthBps;
    pKinesisVideoStream->diagnostics.accumulatedByteCount = 0;
    pKinesisVideoStream->diagnostics.lastFrameRateTimestamp = pKinesisVideoStream->diagnostics.lastTransferRateTimestamp = 0;

    // Set the trackers
    pKinesisVideoStream->eosTracker.size = 0;
    pKinesisVideoStream->eosTracker.offset = 0;
    pKinesisVideoStream->eosTracker.send = FALSE;
    pKinesisVideoStream->eosTracker.data = NULL;
    pKinesisVideoStream->metadataTracker.size = 0;
    pKinesisVideoStream->metadataTracker.offset = 0;
    pKinesisVideoStream->metadataTracker.send = FALSE;
    pKinesisVideoStream->metadataTracker.data = NULL;

    // initialize streamingAuthInfo expiration
    pKinesisVideoStream->streamingAuthInfo.expiration = INVALID_TIMESTAMP_VALUE;

    // Reset the current view item
    MEMSET(&pKinesisVideoStream->curViewItem, 0x00, SIZEOF(CurrentViewItem));
    pKinesisVideoStream->curViewItem.viewItem.handle = INVALID_ALLOCATION_HANDLE_VALUE;

    // Copy the structures in their entirety
    MEMCPY(&pKinesisVideoStream->streamInfo, pStreamInfo, SIZEOF(StreamInfo));
    fixupStreamInfo(&pKinesisVideoStream->streamInfo);

    // Fix-up the stream name if not specified
    if (pKinesisVideoStream->streamInfo.name[0] == '\0') {
        STRCPY(pKinesisVideoStream->streamInfo.name, tempStreamName);
    }

    // Create the stream lock
    pKinesisVideoStream->base.lock = pKinesisVideoClient->clientCallbacks.createMutexFn(pKinesisVideoClient->clientCallbacks.customData, TRUE);

    // Create the Ready state condition variable
    pKinesisVideoStream->base.ready = pKinesisVideoClient->clientCallbacks.createConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData);

    // Create the Stream Closed notifier condition variable
    pKinesisVideoStream->streamClosedCondition =
        pKinesisVideoClient->clientCallbacks.createConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData);

    // Create the buffer availability condition variable
    pKinesisVideoStream->bufferAvailabilityCondition =
        pKinesisVideoClient->clientCallbacks.createConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData);
    // Fix-up the buffer duration and replay duration first
    if (pKinesisVideoStream->streamInfo.streamCaps.bufferDuration <= MIN_CONTENT_VIEW_BUFFER_DURATION) {
        pKinesisVideoStream->streamInfo.streamCaps.bufferDuration = MIN_VIEW_BUFFER_DURATION;
    }
    pKinesisVideoStream->streamInfo.streamCaps.replayDuration = MIN(pStreamInfo->streamCaps.bufferDuration, pStreamInfo->streamCaps.replayDuration);

    // Set the tags pointer to point after the KinesisVideoStream struct
    pKinesisVideoStream->streamInfo.tags = (PTag)((PBYTE)(pKinesisVideoStream + 1));

    // Package the tags after the structure
    CHK_STATUS(packageTags(pStreamInfo->tagCount, pStreamInfo->tags, tagsSize, pKinesisVideoStream->streamInfo.tags, &tagsSize));
    pKinesisVideoStream->streamInfo.tagCount = pStreamInfo->tagCount;
    pCurPnt = (PBYTE) pKinesisVideoStream->streamInfo.tags + tagsSize;

    // Fix-up/store the segment Uid to make it random if NULL
    pKinesisVideoStream->streamInfo.streamCaps.segmentUuid = pCurPnt;
    if (pStreamInfo->streamCaps.segmentUuid == NULL) {
        for (i = 0; i < MKV_SEGMENT_UUID_LEN; i++) {
            pKinesisVideoStream->streamInfo.streamCaps.segmentUuid[i] =
                ((BYTE)(pKinesisVideoClient->clientCallbacks.getRandomNumberFn(pKinesisVideoClient->clientCallbacks.customData) % 0x100));
        }
    } else {
        MEMCPY(pKinesisVideoStream->streamInfo.streamCaps.segmentUuid, pStreamInfo->streamCaps.segmentUuid, MKV_SEGMENT_UUID_LEN);
    }

    // Advance the current pointer
    pCurPnt += MKV_SEGMENT_UUID_LEN;

    // Copy the structures in their entirety
    // NOTE: This will copy the raw pointers, however, we will only use it in the duration of the call.
    MEMCPY(pCurPnt, pStreamInfo->streamCaps.trackInfoList, trackInfoSize);
    pKinesisVideoStream->streamInfo.streamCaps.trackInfoList = (PTrackInfo) pCurPnt;
    fixupTrackInfo(pKinesisVideoStream->streamInfo.streamCaps.trackInfoList, pKinesisVideoStream->streamInfo.streamCaps.trackInfoCount);

    pKinesisVideoStream->pFrameOrderCoordinator = NULL;
    if (pKinesisVideoStream->streamInfo.streamCaps.frameOrderingMode != FRAME_ORDER_MODE_PASS_THROUGH) {
        CHK_STATUS(createFrameOrderCoordinator(pKinesisVideoStream, &pKinesisVideoStream->pFrameOrderCoordinator));
    }

    // Move pCurPnt to the end of pKinesisVideoStream->streamInfo.streamCaps.trackInfoList
    pCurPnt = (PBYTE)(pKinesisVideoStream->streamInfo.streamCaps.trackInfoList + pKinesisVideoStream->streamInfo.streamCaps.trackInfoCount);

    // Calculate the max items in the view
    maxViewItems = calculateViewItemCount(&pKinesisVideoStream->streamInfo);

    // Create the view
    CHK_STATUS(createContentView(maxViewItems, pKinesisVideoStream->streamInfo.streamCaps.bufferDuration, viewItemRemoved,
                                 TO_CUSTOM_DATA(pKinesisVideoStream), pStreamInfo->streamCaps.viewOverflowPolicy, &pView));
    pKinesisVideoStream->pView = pView;

    // Create an MKV generator
    CHK_STATUS(createPackager(pKinesisVideoStream, &pMkvGenerator));

    // Set the generator object
    pKinesisVideoStream->pMkvGenerator = pMkvGenerator;

    // Package EOS metadata and store
    CHK_STATUS(generateEosMetadata(pKinesisVideoStream));

    // Create the state machine
    CHK_STATUS(createStateMachine(STREAM_STATE_MACHINE_STATES, STREAM_STATE_MACHINE_STATE_COUNT, TO_CUSTOM_DATA(pKinesisVideoStream),
                                  pKinesisVideoClient->clientCallbacks.getCurrentTimeFn, pKinesisVideoClient->clientCallbacks.customData,
                                  &pStateMachine));
    pKinesisVideoStream->base.pStateMachine = pStateMachine;

    // Create the stream upload handle queue
    CHK_STATUS(stackQueueCreate(&pStackQueue));
    pKinesisVideoStream->pUploadInfoQueue = pStackQueue;

    // Create the metadata queue
    CHK_STATUS(stackQueueCreate(&pStackQueue));
    pKinesisVideoStream->pMetadataQueue = pStackQueue;

    // Set the call result to unknown to start
    pKinesisVideoStream->base.result = SERVICE_CALL_RESULT_NOT_SET;

    CHK_STATUS(semaphoreCreate(MAX_PIC_REENTRANCY_COUNT, &pKinesisVideoStream->base.shutdownSemaphore));

    pKinesisVideoStream->base.shutdown = FALSE;

    // Reset the ACK parser
    CHK_STATUS(resetAckParserState(pKinesisVideoStream));

    // Set the new object in the parent object, set the ID and increment the current count
    // NOTE: Make sure we set the stream in the client object before setting the return value and
    // no tear-down flag is set.
    pKinesisVideoClient->streams[pKinesisVideoStream->streamId] = pKinesisVideoStream;
    pKinesisVideoClient->streamCount++;

    // Assign the created object
    *ppKinesisVideoStream = pKinesisVideoStream;

    // Setting this value will ensure we won't tear down the newly created object after this on failure
    tearDownOnError = FALSE;

    // Set the initial state to new
    pKinesisVideoStream->streamState = STREAM_STATE_NEW;

    // We can now unlock the client lock so we won't block it
    pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    clientLocked = FALSE;

    // Store the stream uptime start
    pKinesisVideoStream->diagnostics.createTime =
        pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);

    // Set up the next logging time if enabled
    pKinesisVideoStream->diagnostics.nextLoggingTime =
        pKinesisVideoStream->diagnostics.createTime + pKinesisVideoClient->deviceInfo.clientInfo.metricLoggingPeriod;

    // Call to transition the state machine
    CHK_STATUS(stepStateMachine(pKinesisVideoStream->base.pStateMachine));

CleanUp:

    if (clientLocked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
    }

    if (STATUS_FAILED(retStatus) && tearDownOnError) {
        if (!clientStreamsListLocked) {
            pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData,
                                                             pKinesisVideoClient->base.streamListLock);
            clientStreamsListLocked = TRUE;
        }
        freeStream(pKinesisVideoStream);
    }

    // Need to be under streamListLock while calling freeStream() so wait until after that check to unlock
    if (clientStreamsListLocked) {
        pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.streamListLock);
    }

    LEAVES();
    return retStatus;
}