in src/client/src/Stream.c [18:321]
STATUS createStream(PKinesisVideoClient pKinesisVideoClient, PStreamInfo pStreamInfo, PKinesisVideoStream* ppKinesisVideoStream)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PKinesisVideoStream pKinesisVideoStream = NULL;
PContentView pView = NULL;
PStackQueue pStackQueue = NULL;
PMkvGenerator pMkvGenerator = NULL;
PStateMachine pStateMachine = NULL;
UINT32 allocationSize, maxViewItems, i;
PBYTE pCurPnt = NULL;
BOOL clientLocked = FALSE, clientStreamsListLocked = FALSE;
BOOL tearDownOnError = TRUE;
CHAR tempStreamName[MAX_STREAM_NAME_LEN];
UINT32 trackInfoSize, tagsSize;
CHK(pKinesisVideoClient != NULL && ppKinesisVideoStream != NULL, STATUS_NULL_ARG);
// Set the return stream pointer first
*ppKinesisVideoStream = NULL;
// Validate the input structs
CHK_STATUS(validateStreamInfo(pStreamInfo, &pKinesisVideoClient->clientCallbacks));
logStreamInfo(pStreamInfo);
// Lock the client streams list lock because we will iterate over current streams + add more streams
// We follow the principle that the streamListLock is *never* acquired inside a client or streams lock,
// always outside to prevent deadlock
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.streamListLock);
clientStreamsListLocked = TRUE;
// Lock the client
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = TRUE;
// Check for the stream count
CHK(pKinesisVideoClient->streamCount < pKinesisVideoClient->deviceInfo.streamCount, STATUS_MAX_STREAM_COUNT);
// Fix-up the stream name if not specified
if (pStreamInfo->name[0] == '\0') {
createRandomName(tempStreamName, DEFAULT_STREAM_NAME_LEN, pKinesisVideoClient->clientCallbacks.getRandomNumberFn,
pKinesisVideoClient->clientCallbacks.customData);
} else {
// Copy the stream name otherwise
// NOTE: Stream name length has already been validated
STRCPY(tempStreamName, pStreamInfo->name);
}
// Check if a stream by that name already exists
for (i = 0; i < pKinesisVideoClient->deviceInfo.streamCount; i++) {
if (NULL != pKinesisVideoClient->streams[i]) {
CHK(0 != STRCMP(pKinesisVideoClient->streams[i]->streamInfo.name, tempStreamName), STATUS_DUPLICATE_STREAM_NAME);
}
}
// Space for track info bits
trackInfoSize = SIZEOF(TrackInfo) * pStreamInfo->streamCaps.trackInfoCount;
// Get the max tags structure size
CHK_STATUS(packageTags(pStreamInfo->tagCount, pStreamInfo->tags, 0, NULL, &tagsSize));
// Allocate the main struct
// NOTE: The calloc will Zero the fields
// We are allocating more than the structure size to accommodate for
// the variable size buffers which will come at the end and for the tags
allocationSize = SIZEOF(KinesisVideoStream) + tagsSize + trackInfoSize + MKV_SEGMENT_UUID_LEN;
pKinesisVideoStream = (PKinesisVideoStream) MEMCALLOC(1, allocationSize);
CHK(pKinesisVideoStream != NULL, STATUS_NOT_ENOUGH_MEMORY);
// Set the stream id first available slot so during the teardown it will clean it up.
for (i = 0; i < pKinesisVideoClient->deviceInfo.streamCount; i++) {
if (NULL == pKinesisVideoClient->streams[i]) {
// Found an empty slot
pKinesisVideoStream->streamId = i;
break;
}
}
// set the maximum frame size observed to 0
pKinesisVideoStream->maxFrameSizeSeen = 0;
// Set the back reference
pKinesisVideoStream->pKinesisVideoClient = pKinesisVideoClient;
// Set the basic info
pKinesisVideoStream->base.identifier = KINESIS_VIDEO_OBJECT_IDENTIFIER_STREAM;
pKinesisVideoStream->base.version = STREAM_CURRENT_VERSION;
// Set the initial state and the stream status
pKinesisVideoStream->streamState = STREAM_STATE_NONE;
pKinesisVideoStream->streamStatus = STREAM_STATUS_CREATING;
// Stream is not ready, not stopped and not closed
pKinesisVideoStream->streamStopped = FALSE;
pKinesisVideoStream->streamReady = FALSE;
pKinesisVideoStream->streamClosed = FALSE;
// Set the stream start timestamps and index
pKinesisVideoStream->newSessionTimestamp = INVALID_TIMESTAMP_VALUE;
pKinesisVideoStream->newSessionIndex = INVALID_VIEW_INDEX_VALUE;
// Not in a grace period
pKinesisVideoStream->gracePeriod = FALSE;
// Shouldn't reset the generator on next key frame
pKinesisVideoStream->resetGeneratorOnKeyFrame = FALSE;
// Skip non-key frames at start
pKinesisVideoStream->skipNonKeyFrames = TRUE;
// Shouldn't reset the generator so set invalid time
pKinesisVideoStream->resetGeneratorTime = INVALID_TIMESTAMP_VALUE;
// No connections have been dropped as this is a new stream
pKinesisVideoStream->connectionState = UPLOAD_CONNECTION_STATE_OK;
// Set the last frame EoFr indicator
pKinesisVideoStream->eofrFrame = FALSE;
// Set last PutFrame timestamp to invalid time value
pKinesisVideoStream->lastPutFrameTimestamp = INVALID_TIMESTAMP_VALUE;
// Set the initial diagnostics information from the defaults
pKinesisVideoStream->diagnostics.currentFrameRate = pStreamInfo->streamCaps.frameRate;
pKinesisVideoStream->diagnostics.elementaryFrameRate = pStreamInfo->streamCaps.frameRate;
pKinesisVideoStream->diagnostics.currentTransferRate = pStreamInfo->streamCaps.avgBandwidthBps;
pKinesisVideoStream->diagnostics.accumulatedByteCount = 0;
pKinesisVideoStream->diagnostics.lastFrameRateTimestamp = pKinesisVideoStream->diagnostics.lastTransferRateTimestamp = 0;
// Set the trackers
pKinesisVideoStream->eosTracker.size = 0;
pKinesisVideoStream->eosTracker.offset = 0;
pKinesisVideoStream->eosTracker.send = FALSE;
pKinesisVideoStream->eosTracker.data = NULL;
pKinesisVideoStream->metadataTracker.size = 0;
pKinesisVideoStream->metadataTracker.offset = 0;
pKinesisVideoStream->metadataTracker.send = FALSE;
pKinesisVideoStream->metadataTracker.data = NULL;
// initialize streamingAuthInfo expiration
pKinesisVideoStream->streamingAuthInfo.expiration = INVALID_TIMESTAMP_VALUE;
// Reset the current view item
MEMSET(&pKinesisVideoStream->curViewItem, 0x00, SIZEOF(CurrentViewItem));
pKinesisVideoStream->curViewItem.viewItem.handle = INVALID_ALLOCATION_HANDLE_VALUE;
// Copy the structures in their entirety
MEMCPY(&pKinesisVideoStream->streamInfo, pStreamInfo, SIZEOF(StreamInfo));
fixupStreamInfo(&pKinesisVideoStream->streamInfo);
// Fix-up the stream name if not specified
if (pKinesisVideoStream->streamInfo.name[0] == '\0') {
STRCPY(pKinesisVideoStream->streamInfo.name, tempStreamName);
}
// Create the stream lock
pKinesisVideoStream->base.lock = pKinesisVideoClient->clientCallbacks.createMutexFn(pKinesisVideoClient->clientCallbacks.customData, TRUE);
// Create the Ready state condition variable
pKinesisVideoStream->base.ready = pKinesisVideoClient->clientCallbacks.createConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData);
// Create the Stream Closed notifier condition variable
pKinesisVideoStream->streamClosedCondition =
pKinesisVideoClient->clientCallbacks.createConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData);
// Create the buffer availability condition variable
pKinesisVideoStream->bufferAvailabilityCondition =
pKinesisVideoClient->clientCallbacks.createConditionVariableFn(pKinesisVideoClient->clientCallbacks.customData);
// Fix-up the buffer duration and replay duration first
if (pKinesisVideoStream->streamInfo.streamCaps.bufferDuration <= MIN_CONTENT_VIEW_BUFFER_DURATION) {
pKinesisVideoStream->streamInfo.streamCaps.bufferDuration = MIN_VIEW_BUFFER_DURATION;
}
pKinesisVideoStream->streamInfo.streamCaps.replayDuration = MIN(pStreamInfo->streamCaps.bufferDuration, pStreamInfo->streamCaps.replayDuration);
// Set the tags pointer to point after the KinesisVideoStream struct
pKinesisVideoStream->streamInfo.tags = (PTag)((PBYTE)(pKinesisVideoStream + 1));
// Package the tags after the structure
CHK_STATUS(packageTags(pStreamInfo->tagCount, pStreamInfo->tags, tagsSize, pKinesisVideoStream->streamInfo.tags, &tagsSize));
pKinesisVideoStream->streamInfo.tagCount = pStreamInfo->tagCount;
pCurPnt = (PBYTE) pKinesisVideoStream->streamInfo.tags + tagsSize;
// Fix-up/store the segment Uid to make it random if NULL
pKinesisVideoStream->streamInfo.streamCaps.segmentUuid = pCurPnt;
if (pStreamInfo->streamCaps.segmentUuid == NULL) {
for (i = 0; i < MKV_SEGMENT_UUID_LEN; i++) {
pKinesisVideoStream->streamInfo.streamCaps.segmentUuid[i] =
((BYTE)(pKinesisVideoClient->clientCallbacks.getRandomNumberFn(pKinesisVideoClient->clientCallbacks.customData) % 0x100));
}
} else {
MEMCPY(pKinesisVideoStream->streamInfo.streamCaps.segmentUuid, pStreamInfo->streamCaps.segmentUuid, MKV_SEGMENT_UUID_LEN);
}
// Advance the current pointer
pCurPnt += MKV_SEGMENT_UUID_LEN;
// Copy the structures in their entirety
// NOTE: This will copy the raw pointers, however, we will only use it in the duration of the call.
MEMCPY(pCurPnt, pStreamInfo->streamCaps.trackInfoList, trackInfoSize);
pKinesisVideoStream->streamInfo.streamCaps.trackInfoList = (PTrackInfo) pCurPnt;
fixupTrackInfo(pKinesisVideoStream->streamInfo.streamCaps.trackInfoList, pKinesisVideoStream->streamInfo.streamCaps.trackInfoCount);
pKinesisVideoStream->pFrameOrderCoordinator = NULL;
if (pKinesisVideoStream->streamInfo.streamCaps.frameOrderingMode != FRAME_ORDER_MODE_PASS_THROUGH) {
CHK_STATUS(createFrameOrderCoordinator(pKinesisVideoStream, &pKinesisVideoStream->pFrameOrderCoordinator));
}
// Move pCurPnt to the end of pKinesisVideoStream->streamInfo.streamCaps.trackInfoList
pCurPnt = (PBYTE)(pKinesisVideoStream->streamInfo.streamCaps.trackInfoList + pKinesisVideoStream->streamInfo.streamCaps.trackInfoCount);
// Calculate the max items in the view
maxViewItems = calculateViewItemCount(&pKinesisVideoStream->streamInfo);
// Create the view
CHK_STATUS(createContentView(maxViewItems, pKinesisVideoStream->streamInfo.streamCaps.bufferDuration, viewItemRemoved,
TO_CUSTOM_DATA(pKinesisVideoStream), pStreamInfo->streamCaps.viewOverflowPolicy, &pView));
pKinesisVideoStream->pView = pView;
// Create an MKV generator
CHK_STATUS(createPackager(pKinesisVideoStream, &pMkvGenerator));
// Set the generator object
pKinesisVideoStream->pMkvGenerator = pMkvGenerator;
// Package EOS metadata and store
CHK_STATUS(generateEosMetadata(pKinesisVideoStream));
// Create the state machine
CHK_STATUS(createStateMachine(STREAM_STATE_MACHINE_STATES, STREAM_STATE_MACHINE_STATE_COUNT, TO_CUSTOM_DATA(pKinesisVideoStream),
pKinesisVideoClient->clientCallbacks.getCurrentTimeFn, pKinesisVideoClient->clientCallbacks.customData,
&pStateMachine));
pKinesisVideoStream->base.pStateMachine = pStateMachine;
// Create the stream upload handle queue
CHK_STATUS(stackQueueCreate(&pStackQueue));
pKinesisVideoStream->pUploadInfoQueue = pStackQueue;
// Create the metadata queue
CHK_STATUS(stackQueueCreate(&pStackQueue));
pKinesisVideoStream->pMetadataQueue = pStackQueue;
// Set the call result to unknown to start
pKinesisVideoStream->base.result = SERVICE_CALL_RESULT_NOT_SET;
CHK_STATUS(semaphoreCreate(MAX_PIC_REENTRANCY_COUNT, &pKinesisVideoStream->base.shutdownSemaphore));
pKinesisVideoStream->base.shutdown = FALSE;
// Reset the ACK parser
CHK_STATUS(resetAckParserState(pKinesisVideoStream));
// Set the new object in the parent object, set the ID and increment the current count
// NOTE: Make sure we set the stream in the client object before setting the return value and
// no tear-down flag is set.
pKinesisVideoClient->streams[pKinesisVideoStream->streamId] = pKinesisVideoStream;
pKinesisVideoClient->streamCount++;
// Assign the created object
*ppKinesisVideoStream = pKinesisVideoStream;
// Setting this value will ensure we won't tear down the newly created object after this on failure
tearDownOnError = FALSE;
// Set the initial state to new
pKinesisVideoStream->streamState = STREAM_STATE_NEW;
// We can now unlock the client lock so we won't block it
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
clientLocked = FALSE;
// Store the stream uptime start
pKinesisVideoStream->diagnostics.createTime =
pKinesisVideoClient->clientCallbacks.getCurrentTimeFn(pKinesisVideoClient->clientCallbacks.customData);
// Set up the next logging time if enabled
pKinesisVideoStream->diagnostics.nextLoggingTime =
pKinesisVideoStream->diagnostics.createTime + pKinesisVideoClient->deviceInfo.clientInfo.metricLoggingPeriod;
// Call to transition the state machine
CHK_STATUS(stepStateMachine(pKinesisVideoStream->base.pStateMachine));
CleanUp:
if (clientLocked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.lock);
}
if (STATUS_FAILED(retStatus) && tearDownOnError) {
if (!clientStreamsListLocked) {
pKinesisVideoClient->clientCallbacks.lockMutexFn(pKinesisVideoClient->clientCallbacks.customData,
pKinesisVideoClient->base.streamListLock);
clientStreamsListLocked = TRUE;
}
freeStream(pKinesisVideoStream);
}
// Need to be under streamListLock while calling freeStream() so wait until after that check to unlock
if (clientStreamsListLocked) {
pKinesisVideoClient->clientCallbacks.unlockMutexFn(pKinesisVideoClient->clientCallbacks.customData, pKinesisVideoClient->base.streamListLock);
}
LEAVES();
return retStatus;
}