in src/source/CurlApiCallbacks.c [1771:1889]
STATUS tagResourceCurl(UINT64 customData, PCHAR streamArn, UINT32 tagCount, PTag tags, PServiceCallContext pServiceCallContext)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PCHAR paramsJson = NULL;
PCHAR tagsJson = NULL;
CHAR url[MAX_PATH_LEN + 1];
PAwsCredentials pCredentials;
TID threadId = INVALID_TID_VALUE;
PCurlApiCallbacks pCurlApiCallbacks = (PCurlApiCallbacks) customData;
PCurlRequest pCurlRequest = NULL;
UINT32 i;
INT32 charsCopied;
PCHAR pCurPtr;
PCallbacksProvider pCallbacksProvider = NULL;
BOOL startLocked = FALSE, requestLocked = FALSE, shutdownLocked = FALSE, streamShuttingDown = FALSE;
UINT64 currentTime;
STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
CHK(pCurlApiCallbacks != NULL && pCurlApiCallbacks->pCallbacksProvider != NULL && pServiceCallContext != NULL, STATUS_INVALID_ARG);
pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;
streamHandle = (STREAM_HANDLE) pServiceCallContext->customData;
CHK(tagCount > 0 && tags != NULL, STATUS_INTERNAL_ERROR);
// Allocate enough space for the string manipulation. We don't want to reserve stack space for this
CHK(NULL != (paramsJson = (PCHAR) MEMALLOC(MAX_TAGS_JSON_PARAMETER_STRING_LEN)), STATUS_NOT_ENOUGH_MEMORY);
CHK(NULL != (tagsJson = (PCHAR) MEMALLOC(MAX_TAGS_JSON_PARAMETER_STRING_LEN)), STATUS_NOT_ENOUGH_MEMORY);
// Prepare the tags elements
for (i = 0, pCurPtr = tagsJson; i < tagCount; i++) {
charsCopied =
SNPRINTF(pCurPtr, MAX_TAGS_JSON_PARAMETER_STRING_LEN - (pCurPtr - tagsJson), TAG_PARAM_JSON_TEMPLATE, tags[i].name, tags[i].value);
CHK(charsCopied > 0 && charsCopied < MAX_TAGS_JSON_PARAMETER_STRING_LEN - (pCurPtr - tagsJson), STATUS_INTERNAL_ERROR);
pCurPtr += charsCopied;
}
// Remove the tailing comma
*(pCurPtr - 1) = '\0';
// Prepare the json params for the call
SNPRINTF(paramsJson, MAX_TAGS_JSON_PARAMETER_STRING_LEN, TAG_RESOURCE_PARAM_JSON_TEMPLATE, streamArn, tagsJson);
// Validate the credentials
CHK_STATUS(deserializeAwsCredentials(pServiceCallContext->pAuthInfo->data));
pCredentials = (PAwsCredentials) pServiceCallContext->pAuthInfo->data;
CHK(pCredentials->version <= AWS_CREDENTIALS_CURRENT_VERSION, STATUS_INVALID_AWS_CREDENTIALS_VERSION);
CHK(pCredentials->size == pServiceCallContext->pAuthInfo->size, STATUS_INTERNAL_ERROR);
// Create the API url
STRCPY(url, pCurlApiCallbacks->controlPlaneUrl);
STRCAT(url, TAG_RESOURCE_API_POSTFIX);
// Create a request object
currentTime = pCallbacksProvider->clientCallbacks.getCurrentTimeFn(pCallbacksProvider->clientCallbacks.customData);
CHK_STATUS(createCurlRequest(HTTP_REQUEST_VERB_POST, url, paramsJson, streamHandle, pCurlApiCallbacks->region, currentTime,
CURL_API_DEFAULT_CONNECTION_TIMEOUT, pServiceCallContext->timeout, pServiceCallContext->callAfter,
pCurlApiCallbacks->certPath, pCredentials, pCurlApiCallbacks, &pCurlRequest));
// Set the necessary headers
CHK_STATUS(setRequestHeader(&pCurlRequest->requestInfo, (PCHAR) "user-agent", 0, pCurlApiCallbacks->userAgent, 0));
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
requestLocked = TRUE;
// Lock the startup mutex so the created thread will wait until we are done with bookkeeping
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
startLocked = TRUE;
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.lockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = TRUE;
CHK_STATUS(hashTableContains(pCurlApiCallbacks->pStreamsShuttingDown, streamHandle, &streamShuttingDown));
pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.unlockMutexFn(pCurlApiCallbacks->pCallbacksProvider->clientCallbacks.customData,
pCurlApiCallbacks->shutdownLock);
shutdownLocked = FALSE;
CHK(!streamShuttingDown, STATUS_STREAM_BEING_SHUTDOWN);
// Start the request/response thread
CHK_STATUS(THREAD_CREATE(&threadId, tagResourceCurlHandler, (PVOID) pCurlRequest));
CHK_STATUS(THREAD_DETACH(threadId));
// Set the thread ID in the request, add to the hash table
pCurlRequest->threadId = threadId;
CHK_STATUS(hashTablePut(pCurlApiCallbacks->pActiveRequests, streamHandle, (UINT64) pCurlRequest));
CleanUp:
if (STATUS_FAILED(retStatus)) {
if (IS_VALID_TID_VALUE(threadId)) {
THREAD_CANCEL(threadId);
}
freeCurlRequest(&pCurlRequest);
} else if (startLocked) {
// Release the lock to let the awaiting handler thread to continue
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
}
if (shutdownLocked) {
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->shutdownLock);
}
if (requestLocked) {
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->activeRequestsLock);
}
if (paramsJson != NULL) {
MEMFREE(paramsJson);
}
if (tagsJson != NULL) {
MEMFREE(tagsJson);
}
LEAVES();
return retStatus;
}