in src/source/CurlApiCallbacks.c [912:1014]
STATUS createStreamCurl(UINT64 customData, PCHAR deviceName, PCHAR streamName, PCHAR contentType, PCHAR kmsKeyId, UINT64 retentionPeriod,
PServiceCallContext pServiceCallContext)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
CHAR kmsKey[MAX_JSON_KMS_KEY_ID_STRING_LEN];
CHAR paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
CHAR url[MAX_PATH_LEN + 1];
UINT64 retentionInHours, currentTime;
PAwsCredentials pCredentials = NULL;
TID threadId = INVALID_TID_VALUE;
PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
PCallbacksProvider pCallbacksProvider = NULL;
PCurlRequest pCurlRequest = NULL;
BOOL startLocked = FALSE, requestLocked = FALSE, shutdownLocked = FALSE, streamShuttingDown = FALSE;
STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
CHK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL && pServiceCallContext != NULL, STATUS_INVALID_ARG);
pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;
streamHandle = (STREAM_HANDLE) pServiceCallContext->customData;
// Prepare the JSON string for the KMS param.
// NOTE: the kms key id is optional
if (kmsKeyId != NULL && kmsKeyId[0] != '\0') {
SNPRINTF(kmsKey, ARRAY_SIZE(kmsKey), KMS_KEY_PARAM_JSON_TEMPLATE, kmsKeyId);
} else {
// Empty string
kmsKey[0] = '\0';
}
// Expressed in hours
retentionInHours = (UINT64)(retentionPeriod / HUNDREDS_OF_NANOS_IN_AN_HOUR);
SNPRINTF(paramsJson, ARRAY_SIZE(paramsJson), CREATE_STREAM_PARAM_JSON_TEMPLATE, deviceName, streamName, contentType, kmsKey, retentionInHours);
// Validate the credentials
CHK_STATUS(deserializeAwsCredentials(pServiceCallContext->pAuthInfo->data));
pCredentials = (PAwsCredentials) pServiceCallContext->pAuthInfo->data;
CHK(pCredentials->version <= AWS_CREDENTIALS_CURRENT_VERSION, STATUS_INVALID_AWS_CREDENTIALS_VERSION);
CHK(pCredentials->size == pServiceCallContext->pAuthInfo->size, STATUS_INTERNAL_ERROR);
// Create the API url
STRCPY(url, pCurlApiCallbacks->controlPlaneUrl);
STRCAT(url, CREATE_API_POSTFIX);
// Create a request object
currentTime = pCallbacksProvider->clientCallbacks.getCurrentTimeFn(pCallbacksProvider->clientCallbacks.customData);
CHK_STATUS(createCurlRequest(HTTP_REQUEST_VERB_POST, url, paramsJson, streamHandle, pCurlApiCallbacks->region, currentTime,
CURL_API_DEFAULT_CONNECTION_TIMEOUT, pServiceCallContext->timeout, pServiceCallContext->callAfter,
pCurlApiCallbacks->certPath, pCredentials, pCurlApiCallbacks, &pCurlRequest));
// Set the necessary headers
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "user-agent", 0, pCurlApiCallbacks->userAgent, 0));
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
requestLocked = TRUE;
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = TRUE;
CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &streamShuttingDown));
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = FALSE;
CHK(!streamShuttingDown, STATUS_STREAM_BEING_SHUTDOWN);
// Lock the startup mutex so the created thread will wait until we are done with bookkeeping
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
startLocked = TRUE;
// Start the request/response thread
CHK_STATUS(THREAD_CREATE(&threadId, createStreamCurlHandler, (PVOID) pCurlRequest));
CHK_STATUS(THREAD_DETACH(threadId));
// Set the thread ID in the request, add to the hash table
pCurlRequest->threadId = threadId;
CHK_STATUS(hashTablePut(pCurlApiCallbacks->pActiveRequests, streamHandle, (UINT64) pCurlRequest));
CleanUp:
if (STATUS_FAILED(retStatus)) {
if (IS_VALID_TID_VALUE(threadId)) {
THREAD_CANCEL(threadId);
}
freeCurlRequest(&pCurlRequest);
} else if (startLocked) {
// Release the lock to let the awaiting handler thread to continue
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
}
if (shutdownLocked) {
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->shutdownLock);
}
if (requestLocked) {
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
}
LEAVES();
return retStatus;
}