STATUS tagResourceCurl()

in src/source/CurlApiCallbacks.c [1771:1889]


STATUS tagResourceCurl(UINT64 customData, PCHAR streamArn, UINT32 tagCount, PTag tags, PServiceCallContext pServiceCallContext)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PCHAR paramsJson = NULL;
    PCHAR tagsJson = NULL;
    CHAR url[MAX_PATH_LEN + 1];
    PAwsCredentials pCredentials;
    TID threadId = INVALID_TID_VALUE;
    PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
    PCurlRequest pCurlRequest = NULL;
    UINT32 i;
    INT32 charsCopied;
    PCHAR pCurPtr;
    PCallbacksProvider pCallbacksProvider = NULL;
    BOOL startLocked = FALSE, requestLocked = FALSE, shutdownLocked = FALSE, streamShuttingDown = FALSE;
    UINT64 currentTime;
    STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;

    CHK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL && pServiceCallContext != NULL, STATUS_INVALID_ARG);
    pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;

    streamHandle = (STREAM_HANDLE) pServiceCallContext->customData;

    CHK(tagCount > 0 && tags != NULL, STATUS_INTERNAL_ERROR);

    // Allocate enough space for the string manipulation. We don't want to reserve stack space for this
    CHK(NULL != (paramsJson = (PCHAR) MEMALLOC(MAX_TAGS_JSON_PARAMETER_STRING_LEN)), STATUS_NOT_ENOUGH_MEMORY);
    CHK(NULL != (tagsJson = (PCHAR) MEMALLOC(MAX_TAGS_JSON_PARAMETER_STRING_LEN)), STATUS_NOT_ENOUGH_MEMORY);

    // Prepare the tags elements
    for (i = 0, pCurPtr = tagsJson; i < tagCount; i++) {
        charsCopied =
            SNPRINTF(pCurPtr, MAX_TAGS_JSON_PARAMETER_STRING_LEN - (pCurPtr - tagsJson), TAG_PARAM_JSON_TEMPLATE, tags[i].name, tags[i].value);
        CHK(charsCopied > 0 && charsCopied < MAX_TAGS_JSON_PARAMETER_STRING_LEN - (pCurPtr - tagsJson), STATUS_INTERNAL_ERROR);
        pCurPtr += charsCopied;
    }

    // Remove the tailing comma
    *(pCurPtr - 1) = '\0';

    // Prepare the json params for the call
    SNPRINTF(paramsJson, MAX_TAGS_JSON_PARAMETER_STRING_LEN, TAG_RESOURCE_PARAM_JSON_TEMPLATE, streamArn, tagsJson);

    // Validate the credentials
    CHK_STATUS(deserializeAwsCredentials(pServiceCallContext->pAuthInfo->data));
    pCredentials = (PAwsCredentials) pServiceCallContext->pAuthInfo->data;
    CHK(pCredentials->version <= AWS_CREDENTIALS_CURRENT_VERSION, STATUS_INVALID_AWS_CREDENTIALS_VERSION);
    CHK(pCredentials->size == pServiceCallContext->pAuthInfo->size, STATUS_INTERNAL_ERROR);

    // Create the API url
    STRCPY(url, pCurlApiCallbacks->controlPlaneUrl);
    STRCAT(url, TAG_RESOURCE_API_POSTFIX);

    // Create a request object
    currentTime = pCallbacksProvider->clientCallbacks.getCurrentTimeFn(pCallbacksProvider->clientCallbacks.customData);
    CHK_STATUS(createCurlRequest(HTTP_REQUEST_VERB_POST, url, paramsJson, streamHandle, pCurlApiCallbacks->region, currentTime,
                                 CURL_API_DEFAULT_CONNECTION_TIMEOUT, pServiceCallContext->timeout, pServiceCallContext->callAfter,
                                 pCurlApiCallbacks->certPath, pCredentials, pCurlApiCallbacks, &pCurlRequest));

    // Set the necessary headers
    CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "user-agent", 0, pCurlApiCallbacks->userAgent, 0));

    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
    requestLocked = TRUE;

    // Lock the startup mutex so the created thread will wait until we are done with bookkeeping
    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    startLocked = TRUE;

    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                       pCurlApiCallbacks->shutdownLock);
    shutdownLocked = TRUE;
    CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &streamShuttingDown));
    pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
                                                                         pCurlApiCallbacks->shutdownLock);
    shutdownLocked = FALSE;
    CHK(!streamShuttingDown, STATUS_STREAM_BEING_SHUTDOWN);

    // Start the request/response thread
    CHK_STATUS(THREAD_CREATE(&threadId, tagResourceCurlHandler, (PVOID) pCurlRequest));
    CHK_STATUS(THREAD_DETACH(threadId));

    // Set the thread ID in the request, add to the hash table
    pCurlRequest->threadId = threadId;
    CHK_STATUS(hashTablePut(pCurlApiCallbacks->pActiveRequests, streamHandle, (UINT64) pCurlRequest));

CleanUp:

    if (STATUS_FAILED(retStatus)) {
        if (IS_VALID_TID_VALUE(threadId)) {
            THREAD_CANCEL(threadId);
        }

        freeCurlRequest(&pCurlRequest);
    } else if (startLocked) {
        // Release the lock to let the awaiting handler thread to continue
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    }

    if (shutdownLocked) {
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->shutdownLock);
    }

    if (requestLocked) {
        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
    }

    if (paramsJson != NULL) {
        MEMFREE(paramsJson);
    }

    if (tagsJson != NULL) {
        MEMFREE(tagsJson);
    }

    LEAVES();
    return retStatus;
}