in src/source/CurlApiCallbacks.c [1993:2103]
STATUS putStreamCurl(UINT64 customData, PCHAR streamName, PCHAR containerType, UINT64 startTimestamp, BOOL absoluteFragmentTimestamp,
BOOL acksEnabled, PCHAR streamingEndpoint, PServiceCallContext pServiceCallContext)
{
UNUSED_PARAM(containerType);
ENTERS();
STATUS retStatus = STATUS_SUCCESS, status = STATUS_SUCCESS;
CHAR url[MAX_PATH_LEN + 1];
CHAR startTimestampStr[MAX_TIMESTAMP_STR_LEN + 1] = {0};
PAwsCredentials pCredentials = NULL;
TID threadId = INVALID_TID_VALUE;
PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
PCurlRequest pCurlRequest = NULL;
UINT64 startTimestampMillis, currentTime;
PCallbacksProvider pCallbacksProvider = NULL;
BOOL startLocked = FALSE, uploadsLocked = FALSE, streamShuttingDown = FALSE, shutdownLocked = FALSE;
STREAM_HANDLE streamHandle;
CHECK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL && pServiceCallContext != NULL);
pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;
// Validate the credentials
CHK_STATUS(deserializeAwsCredentials(pServiceCallContext->pAuthInfo->data));
pCredentials = (PAwsCredentials) pServiceCallContext->pAuthInfo->data;
CHK(pCredentials->version <= AWS_CREDENTIALS_CURRENT_VERSION, STATUS_INVALID_AWS_CREDENTIALS_VERSION);
CHK(pCredentials->size == pServiceCallContext->pAuthInfo->size, STATUS_INTERNAL_ERROR);
streamHandle = (STREAM_HANDLE) pServiceCallContext->customData;
// Create the API url
STRCPY(url, streamingEndpoint);
STRCAT(url, PUT_MEDIA_API_POSTFIX);
// Lock for the guards for exclusive access
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeUploadsLock);
uploadsLocked = TRUE;
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = TRUE;
CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &streamShuttingDown));
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = FALSE;
CHK(!streamShuttingDown, STATUS_STREAM_BEING_SHUTDOWN);
// Create a request object
currentTime = pCallbacksProvider->clientCallbacks.getCurrentTimeFn(pCallbacksProvider->clientCallbacks.customData);
CHK_STATUS(createCurlRequest(HTTP_REQUEST_VERB_POST, url, NULL, (STREAM_HANDLE) pServiceCallContext->customData, pCurlApiCallbacks->region,
currentTime, CURL_API_DEFAULT_CONNECTION_TIMEOUT, pServiceCallContext->timeout, pServiceCallContext->callAfter,
pCurlApiCallbacks->certPath, pCredentials, pCurlApiCallbacks, &pCurlRequest));
// Set the necessary headers
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "user-agent", 0, pCurlApiCallbacks->userAgent, 0));
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-stream-name", 0, streamName, 0));
startTimestampMillis = startTimestamp / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
SNPRINTF(startTimestampStr, MAX_TIMESTAMP_STR_LEN + 1, (PCHAR) "%" PRIu64 ".%" PRIu64, startTimestampMillis / 1000, startTimestampMillis % 1000);
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-producer-start-timestamp", 0, startTimestampStr, 0));
CHK_STATUS(
setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-fragment-acknowledgment-required", 0, (PCHAR)(acksEnabled ? "1" : "0"), 0));
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-fragment-timecode-type", 0,
(PCHAR)(absoluteFragmentTimestamp ? "ABSOLUTE" : "RELATIVE"), 0));
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "transfer-encoding", 0, (PCHAR) "chunked", 0));
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "connection", 0, (PCHAR) "keep-alive", 0));
// Lock the startup mutex so the created thread will wait until we are done with bookkeeping
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
startLocked = TRUE;
// Start the request/response thread
CHK_STATUS(THREAD_CREATE(&threadId, putStreamCurlHandler, (PVOID) pCurlRequest));
CHK_STATUS(THREAD_DETACH(threadId));
// Set the thread ID in the request, add to the hash table
pCurlRequest->threadId = threadId;
CHK_STATUS(doubleListInsertItemTail(pCurlApiCallbacks->pActiveUploads, (UINT64) pCurlRequest));
CleanUp:
if (STATUS_FAILED(retStatus)) {
if (IS_VALID_TID_VALUE(threadId)) {
THREAD_CANCEL(threadId);
}
freeCurlRequest(&pCurlRequest);
} else if (!streamShuttingDown) {
status = putStreamResultEvent(pCurlRequest->streamHandle, STATUS_FAILED(retStatus) ? SERVICE_CALL_UNKNOWN : SERVICE_CALL_RESULT_OK,
pCurlRequest->uploadHandle);
// Bubble the notification to potential listeners
notifyCallResult(pCallbacksProvider, status, (STREAM_HANDLE) pServiceCallContext->customData);
if (startLocked) {
// Release the lock to let the awaiting handler thread to continue
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
}
}
// unlock in case if locked.
if (shutdownLocked) {
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->shutdownLock);
}
// unlock in case if locked.
if (uploadsLocked) {
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeUploadsLock);
}
return retStatus;
}