in src/source/CurlApiCallbacks.c [1641:1769]
PVOID getStreamingEndpointCurlHandler(PVOID arg)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PCurlRequest pCurlRequest = (PCurlRequest) arg;
PCurlApiCallbacks pCurlApiCallbacks = NULL;
PCallbacksProvider pCallbacksProvider = NULL;
PCurlResponse pCurlResponse = NULL;
PEndpointTracker pEndpointTracker = NULL;
UINT64 value;
PCHAR pResponseStr;
jsmn_parser parser;
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
UINT32 i, strLen, resultLen;
INT32 tokenCount;
BOOL stopLoop, requestTerminating = FALSE;
STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
SERVICE_CALL_RESULT callResult = SERVICE_CALL_RESULT_NOT_SET;
CHAR streamingEndpoint[MAX_URI_CHAR_LEN + 1];
CHECK(pCurlRequest != NULL && pCurlRequest->pCurlApiCallbacks != NULL && pCurlRequest->pCurlApiCallbacks->pCallbacksProvider != NULL &&
pCurlRequest->pCurlResponse != NULL);
pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;
// Acquire and release the startup lock to ensure the startup sequence is clear
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
// Sign the request
CHK_STATUS(signAwsRequestInfo(&pCurlRequest->requestInfo));
// Wait for the specified amount of time before calling the API
if (pCurlRequest->requestInfo.currentTime < pCurlRequest->requestInfo.callAfter) {
THREAD_SLEEP(pCurlRequest->requestInfo.callAfter - pCurlRequest->requestInfo.currentTime);
}
// Execute the request
CHK_STATUS(curlCallApi(pCurlRequest));
CHK(!ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating), retStatus);
// Set the response object
pCurlResponse = pCurlRequest->pCurlResponse;
// Get the response
CHK(pCurlResponse->callInfo.callResult != SERVICE_CALL_RESULT_NOT_SET, STATUS_INVALID_OPERATION);
pResponseStr = pCurlResponse->callInfo.responseData;
resultLen = pCurlResponse->callInfo.responseDataLen;
DLOGD("GetStreamingEndpoint API response: %.*s", resultLen, pResponseStr);
// Parse the response
jsmn_init(&parser);
tokenCount = jsmn_parse(&parser, pResponseStr, resultLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);
// Loop through the tokens and extract the endpoint
for (i = 1, stopLoop = FALSE; i < (UINT32) tokenCount && !stopLoop; i++) {
if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "DataEndpoint")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_URI_CHAR_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamingEndpoint, pResponseStr + tokens[i + 1].start, strLen);
streamingEndpoint[strLen] = '\0';
i++;
// No need to iterate further
stopLoop = TRUE;
}
}
// Make sure we found the token
CHK(stopLoop, STATUS_GET_STREAMING_ENDPOINT_CALL_FAILED);
CleanUp:
// We need to store the endpoint in the cache
if (STATUS_SUCCEEDED(retStatus)) {
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->cachedEndpointsLock);
// Attempt to retrieve the cached value
retStatus = hashTableGet(pCurlApiCallbacks->pCachedEndpoints, (UINT64) pCurlRequest->streamHandle, &value);
pEndpointTracker = (PEndpointTracker) value;
if (STATUS_FAILED(retStatus) || pEndpointTracker == NULL) {
// Create new tracker and insert in the table
pEndpointTracker = (PEndpointTracker) MEMALLOC(SIZEOF(EndpointTracker));
if (pEndpointTracker != NULL) {
// Insert into the table
retStatus = hashTablePut(pCurlApiCallbacks->pCachedEndpoints, (UINT64) pCurlRequest->streamHandle, (UINT64) pEndpointTracker);
}
}
if (pEndpointTracker != NULL) {
STRNCPY(pEndpointTracker->streamingEndpoint, streamingEndpoint, MAX_URI_CHAR_LEN);
pEndpointTracker->streamingEndpoint[MAX_URI_CHAR_LEN] = '\0';
pEndpointTracker->endpointLastUpdateTime = pCurlRequest->requestInfo.currentTime;
}
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->cachedEndpointsLock);
}
// Preserve the values as we need to free the request before the event notification
if (pCurlRequest->pCurlResponse != NULL) {
callResult = pCurlRequest->pCurlResponse->callInfo.callResult;
}
// Set the thread id just before returning
pCurlRequest->threadId = INVALID_TID_VALUE;
streamHandle = pCurlRequest->streamHandle;
// Free the request object
requestTerminating = ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating);
curlApiCallbacksShutdownActiveRequests(pCurlRequest->pCurlApiCallbacks, pCurlRequest->streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, TRUE,
FALSE);
if (!requestTerminating) {
retStatus = getStreamingEndpointResultEvent(streamHandle, callResult, streamingEndpoint);
// Bubble the notification to potential listeners
notifyCallResult(pCallbacksProvider, retStatus, streamHandle);
}
LEAVES();
// Returning STATUS as PVOID casting first to ptr type to avoid compiler warnings on 64bit platforms.
return (PVOID)(ULONG_PTR) retStatus;
}