in src/source/CurlApiCallbacks.c [1056:1155]
PVOID createStreamCurlHandler(PVOID arg)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PCurlRequest pCurlRequest = (PCurlRequest) arg;
PCurlApiCallbacks pCurlApiCallbacks = NULL;
PCallbacksProvider pCallbacksProvider = NULL;
PCurlResponse pCurlResponse = NULL;
CHAR streamArn[MAX_ARN_LEN + 1];
PCHAR pResponseStr;
jsmn_parser parser;
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
UINT32 i, strLen, resultLen;
INT32 tokenCount;
BOOL stopLoop, requestTerminating;
STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
SERVICE_CALL_RESULT callResult = SERVICE_CALL_RESULT_NOT_SET;
CHECK(pCurlRequest != NULL && pCurlRequest->pCurlApiCallbacks != NULL && pCurlRequest->pCurlApiCallbacks->pCallbacksProvider != NULL &&
pCurlRequest->pCurlResponse != NULL);
pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;
streamArn[0] = '\0';
DLOGD("createStreamCurlHandler response %p", pCurlRequest->pCurlResponse);
// Acquire and release the startup lock to ensure the startup sequence is clear
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
// Sign the request
CHK_STATUS(signAwsRequestInfo(&pCurlRequest->requestInfo));
// Wait for the specified amount of time before calling the API
if (pCurlRequest->requestInfo.currentTime < pCurlRequest->requestInfo.callAfter) {
THREAD_SLEEP(pCurlRequest->requestInfo.callAfter - pCurlRequest->requestInfo.currentTime);
}
// Execute the request
CHK_STATUS(curlCallApi(pCurlRequest));
CHK(!ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating), retStatus);
// Set the response object
pCurlResponse = pCurlRequest->pCurlResponse;
// Get the response
CHK(pCurlResponse->callInfo.callResult != SERVICE_CALL_RESULT_NOT_SET, STATUS_INVALID_OPERATION);
pResponseStr = pCurlResponse->callInfo.responseData;
resultLen = pCurlResponse->callInfo.responseDataLen;
DLOGD("CreateStream API response: %.*s", resultLen, pResponseStr);
// Parse the response
jsmn_init(&parser);
tokenCount = jsmn_parse(&parser, pResponseStr, resultLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);
for (i = 1, stopLoop = FALSE; i < (UINT32) tokenCount && !stopLoop; i++) {
if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamARN")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_ARN_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamArn, pResponseStr + tokens[i + 1].start, strLen);
streamArn[strLen] = '\0';
i++;
// No need to iterate further
stopLoop = TRUE;
}
}
CleanUp:
CHK_LOG_ERR(retStatus);
// Preserve the values as we need to free the request before the event notification
if (pCurlRequest->pCurlResponse != NULL) {
callResult = pCurlRequest->pCurlResponse->callInfo.callResult;
}
// Set the thread id just before returning
pCurlRequest->threadId = INVALID_TID_VALUE;
streamHandle = pCurlRequest->streamHandle;
// Free the request object
requestTerminating = ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating);
curlApiCallbacksShutdownActiveRequests(pCurlRequest->pCurlApiCallbacks, pCurlRequest->streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, TRUE,
FALSE);
if (!requestTerminating) {
retStatus = createStreamResultEvent(streamHandle, callResult, streamArn);
// Bubble the notification to potential listeners
notifyCallResult(pCallbacksProvider, retStatus, streamHandle);
}
LEAVES();
// Returning STATUS as PVOID casting first to ptr type to avoid compiler warnings on 64bit platforms.
return (PVOID)(ULONG_PTR) retStatus;
}