STATUS putStreamCurl()

in src/source/CurlApiCallbacks.c [1993:2103]


STATUS putStreamCurl(UINT64 customData, PCHAR streamName, PCHAR containerType, UINT64 startTimestamp, BOOL absoluteFragmentTimestamp,
                     BOOL acksEnabled, PCHAR streamingEndpoint, PServiceCallContext pServiceCallContext)
{
    UNUSED_PARAM(containerType);

    ENTERS();
    STATUS retStatus = STATUS_SUCCESS, status = STATUS_SUCCESS;
    CHAR url[MAX_PATH_LEN + 1];
    CHAR startTimestampStr[MAX_TIMESTAMP_STR_LEN + 1] = {0};
    PAwsCredentials pCredentials = NULL;
    TID threadId = INVALID_TID_VALUE;
    PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
    PCurlRequest pCurlRequest = NULL;
    UINT64 startTimestampMillis, currentTime;
    PCallbacksProvider pCallbacksProvider = NULL;
    BOOL startLocked = FALSE, uploadsLocked = FALSE, streamShuttingDown = FALSE, shutdownLocked = FALSE;
    STREAM_HANDLE streamHandle;

    CHECK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL && pServiceCallContext != NULL);
    pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;

    // Validate the credentials
    CHK_STATUS(deserializeAwsCredentials(pServiceCallContext->pAuthInfo->data));
    pCredentials = (PAwsCredentials) pServiceCallContext->pAuthInfo->data;
    CHK(pCredentials->version <= AWS_CREDENTIALS_CURRENT_VERSION, STATUS_INVALID_AWS_CREDENTIALS_VERSION);
    CHK(pCredentials->size == pServiceCallContext->pAuthInfo->size, STATUS_INTERNAL_ERROR);

    streamHandle = (STREAM_HANDLE) pServiceCallContext->customData;

    // Create the API url
    STRCPY(url, streamingEndpoint);
    STRCAT(url, PUT_MEDIA_API_POSTFIX);

    // Lock for the guards for exclusive access
    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeUploadsLock);
    uploadsLocked = TRUE;

    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                       pCurlApiCallbacks->shutdownLock);
    shutdownLocked = TRUE;
    CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &streamShuttingDown));
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                         pCurlApiCallbacks->shutdownLock);
    shutdownLocked = FALSE;
    CHK(!streamShuttingDown, STATUS_STREAM_BEING_SHUTDOWN);

    // Create a request object
    currentTime = pCallbacksProvider->clientCallbacks.getCurrentTimeFn(pCallbacksProvider->clientCallbacks.customData);
    CHK_STATUS(createCurlRequest(HTTP_REQUEST_VERB_POST, url, NULL, (STREAM_HANDLE) pServiceCallContext->customData, pCurlApiCallbacks->region,
                                 currentTime, CURL_API_DEFAULT_CONNECTION_TIMEOUT, pServiceCallContext->timeout, pServiceCallContext->callAfter,
                                 pCurlApiCallbacks->certPath, pCredentials, pCurlApiCallbacks, &pCurlRequest));

    // Set the necessary headers
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "user-agent", 0, pCurlApiCallbacks->userAgent, 0));
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-stream-name", 0, streamName, 0));
    startTimestampMillis = startTimestamp / HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
    SNPRINTF(startTimestampStr, MAX_TIMESTAMP_STR_LEN + 1, (PCHAR) "%" PRIu64 ".%" PRIu64, startTimestampMillis / 1000, startTimestampMillis % 1000);
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-producer-start-timestamp", 0, startTimestampStr, 0));
    CHK_STATUS(
        setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-fragment-acknowledgment-required", 0, (PCHAR)(acksEnabled ? "1" : "0"), 0));
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "x-amzn-fragment-timecode-type", 0,
                                (PCHAR)(absoluteFragmentTimestamp ? "ABSOLUTE" : "RELATIVE"), 0));
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "transfer-encoding", 0, (PCHAR) "chunked", 0));
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "connection", 0, (PCHAR) "keep-alive", 0));

    // Lock the startup mutex so the created thread will wait until we are done with bookkeeping
    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    startLocked = TRUE;

    // Start the request/response thread
    CHK_STATUS(THREAD_CREATE(&threadId, putStreamCurlHandler, (PVOID) pCurlRequest));
    CHK_STATUS(THREAD_DETACH(threadId));

    // Set the thread ID in the request, add to the hash table
    pCurlRequest->threadId = threadId;

    CHK_STATUS(doubleListInsertItemTail(pCurlApiCallbacks->pActiveUploads, (UINT64) pCurlRequest));

CleanUp:

    if (STATUS_FAILED(retStatus)) {
        if (IS_VALID_TID_VALUE(threadId)) {
            THREAD_CANCEL(threadId);
        }

        freeCurlRequest(&pCurlRequest);
    } else if (!streamShuttingDown) {
        status = putStreamResultEvent(pCurlRequest->streamHandle, STATUS_FAILED(retStatus) ? SERVICE_CALL_UNKNOWN : SERVICE_CALL_RESULT_OK,
                                      pCurlRequest->uploadHandle);

        // Bubble the notification to potential listeners
        notifyCallResult(pCallbacksProvider, status, (STREAM_HANDLE) pServiceCallContext->customData);

        if (startLocked) {
            // Release the lock to let the awaiting handler thread to continue
            pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
        }
    }

    // unlock in case if locked.
    if (shutdownLocked) {
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->shutdownLock);
    }

    // unlock in case if locked.
    if (uploadsLocked) {
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeUploadsLock);
    }

    return retStatus;
}