STATUS createStreamCurl()

in src/source/CurlApiCallbacks.c [912:1014]


STATUS createStreamCurl(UINT64 customData, PCHAR deviceName, PCHAR streamName, PCHAR contentType, PCHAR kmsKeyId, UINT64 retentionPeriod,
                        PServiceCallContext pServiceCallContext)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    CHAR kmsKey[MAX_JSON_KMS_KEY_ID_STRING_LEN];
    CHAR paramsJson[MAX_JSON_PARAMETER_STRING_LEN];
    CHAR url[MAX_PATH_LEN + 1];
    UINT64 retentionInHours, currentTime;
    PAwsCredentials pCredentials = NULL;
    TID threadId = INVALID_TID_VALUE;
    PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
    PCallbacksProvider pCallbacksProvider = NULL;
    PCurlRequest pCurlRequest = NULL;
    BOOL startLocked = FALSE, requestLocked = FALSE, shutdownLocked = FALSE, streamShuttingDown = FALSE;
    STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;

    CHK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL && pServiceCallContext != NULL, STATUS_INVALID_ARG);
    pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;

    streamHandle = (STREAM_HANDLE) pServiceCallContext->customData;

    // Prepare the JSON string for the KMS param.
    // NOTE: the kms key id is optional
    if (kmsKeyId != NULL && kmsKeyId[0] != '\0') {
        SNPRINTF(kmsKey, ARRAY_SIZE(kmsKey), KMS_KEY_PARAM_JSON_TEMPLATE, kmsKeyId);
    } else {
        // Empty string
        kmsKey[0] = '\0';
    }

    // Expressed in hours
    retentionInHours = (UINT64)(retentionPeriod / HUNDREDS_OF_NANOS_IN_AN_HOUR);
    SNPRINTF(paramsJson, ARRAY_SIZE(paramsJson), CREATE_STREAM_PARAM_JSON_TEMPLATE, deviceName, streamName, contentType, kmsKey, retentionInHours);

    // Validate the credentials
    CHK_STATUS(deserializeAwsCredentials(pServiceCallContext->pAuthInfo->data));
    pCredentials = (PAwsCredentials) pServiceCallContext->pAuthInfo->data;
    CHK(pCredentials->version <= AWS_CREDENTIALS_CURRENT_VERSION, STATUS_INVALID_AWS_CREDENTIALS_VERSION);
    CHK(pCredentials->size == pServiceCallContext->pAuthInfo->size, STATUS_INTERNAL_ERROR);

    // Create the API url
    STRCPY(url, pCurlApiCallbacks->controlPlaneUrl);
    STRCAT(url, CREATE_API_POSTFIX);

    // Create a request object
    currentTime = pCallbacksProvider->clientCallbacks.getCurrentTimeFn(pCallbacksProvider->clientCallbacks.customData);
    CHK_STATUS(createCurlRequest(HTTP_REQUEST_VERB_POST, url, paramsJson, streamHandle, pCurlApiCallbacks->region, currentTime,
                                 CURL_API_DEFAULT_CONNECTION_TIMEOUT, pServiceCallContext->timeout, pServiceCallContext->callAfter,
                                 pCurlApiCallbacks->certPath, pCredentials, pCurlApiCallbacks, &pCurlRequest));

    // Set the necessary headers
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "user-agent", 0, pCurlApiCallbacks->userAgent, 0));

    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
    requestLocked = TRUE;

    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                       pCurlApiCallbacks->shutdownLock);
    shutdownLocked = TRUE;
    CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &streamShuttingDown));
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                         pCurlApiCallbacks->shutdownLock);
    shutdownLocked = FALSE;
    CHK(!streamShuttingDown, STATUS_STREAM_BEING_SHUTDOWN);

    // Lock the startup mutex so the created thread will wait until we are done with bookkeeping
    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    startLocked = TRUE;

    // Start the request/response thread
    CHK_STATUS(THREAD_CREATE(&threadId, createStreamCurlHandler, (PVOID) pCurlRequest));
    CHK_STATUS(THREAD_DETACH(threadId));

    // Set the thread ID in the request, add to the hash table
    pCurlRequest->threadId = threadId;

    CHK_STATUS(hashTablePut(pCurlApiCallbacks->pActiveRequests, streamHandle, (UINT64) pCurlRequest));

CleanUp:

    if (STATUS_FAILED(retStatus)) {
        if (IS_VALID_TID_VALUE(threadId)) {
            THREAD_CANCEL(threadId);
        }

        freeCurlRequest(&pCurlRequest);
    } else if (startLocked) {
        // Release the lock to let the awaiting handler thread to continue
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    }

    if (shutdownLocked) {
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->shutdownLock);
    }

    if (requestLocked) {
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
    }

    LEAVES();
    return retStatus;
}