PVOID getStreamingEndpointCurlHandler()

in src/source/CurlApiCallbacks.c [1641:1769]


PVOID getStreamingEndpointCurlHandler(PVOID arg)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PCurlRequest pCurlRequest = (PCurlRequest) arg;
    PCurlApiCallbacks pCurlApiCallbacks = NULL;
    PCallbacksProvider pCallbacksProvider = NULL;
    PCurlResponse pCurlResponse = NULL;
    PEndpointTracker pEndpointTracker = NULL;
    UINT64 value;
    PCHAR pResponseStr;
    jsmn_parser parser;
    jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
    UINT32 i, strLen, resultLen;
    INT32 tokenCount;
    BOOL stopLoop, requestTerminating = FALSE;
    STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
    SERVICE_CALL_RESULT callResult = SERVICE_CALL_RESULT_NOT_SET;
    CHAR streamingEndpoint[MAX_URI_CHAR_LEN + 1];

    CHECK(pCurlRequest != NULL && pCurlRequest->pCurlApiCallbacks != NULL && pCurlRequest->pCurlApiCallbacks->pCallbacksProvider != NULL &&
          pCurlRequest->pCurlResponse != NULL);
    pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
    pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;

    // Acquire and release the startup lock to ensure the startup sequence is clear
    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);

    // Sign the request
    CHK_STATUS(signAwsRequestInfo(&pCurlRequest->requestInfo));

    // Wait for the specified amount of time before calling the API
    if (pCurlRequest->requestInfo.currentTime < pCurlRequest->requestInfo.callAfter) {
        THREAD_SLEEP(pCurlRequest->requestInfo.callAfter - pCurlRequest->requestInfo.currentTime);
    }

    // Execute the request
    CHK_STATUS(curlCallApi(pCurlRequest));
    CHK(!ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating), retStatus);

    // Set the response object
    pCurlResponse = pCurlRequest->pCurlResponse;

    // Get the response
    CHK(pCurlResponse->callInfo.callResult != SERVICE_CALL_RESULT_NOT_SET, STATUS_INVALID_OPERATION);
    pResponseStr = pCurlResponse->callInfo.responseData;
    resultLen = pCurlResponse->callInfo.responseDataLen;

    DLOGD("GetStreamingEndpoint API response: %.*s", resultLen, pResponseStr);

    // Parse the response
    jsmn_init(&parser);
    tokenCount = jsmn_parse(&parser, pResponseStr, resultLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
    CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
    CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);

    // Loop through the tokens and extract the endpoint
    for (i = 1, stopLoop = FALSE; i < (UINT32) tokenCount && !stopLoop; i++) {
        if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "DataEndpoint")) {
            strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
            CHK(strLen <= MAX_URI_CHAR_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
            STRNCPY(streamingEndpoint, pResponseStr + tokens[i + 1].start, strLen);
            streamingEndpoint[strLen] = '\0';
            i++;

            // No need to iterate further
            stopLoop = TRUE;
        }
    }

    // Make sure we found the token
    CHK(stopLoop, STATUS_GET_STREAMING_ENDPOINT_CALL_FAILED);

CleanUp:

    // We need to store the endpoint in the cache
    if (STATUS_SUCCEEDED(retStatus)) {
        pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->cachedEndpointsLock);

        // Attempt to retrieve the cached value
        retStatus = hashTableGet(pCurlApiCallbacks->pCachedEndpoints, (UINT64) pCurlRequest->streamHandle, &value);
        pEndpointTracker = (PEndpointTracker) value;

        if (STATUS_FAILED(retStatus) || pEndpointTracker == NULL) {
            // Create new tracker and insert in the table
            pEndpointTracker = (PEndpointTracker) MEMALLOC(SIZEOF(EndpointTracker));

            if (pEndpointTracker != NULL) {
                // Insert into the table
                retStatus = hashTablePut(pCurlApiCallbacks->pCachedEndpoints, (UINT64) pCurlRequest->streamHandle, (UINT64) pEndpointTracker);
            }
        }

        if (pEndpointTracker != NULL) {
            STRNCPY(pEndpointTracker->streamingEndpoint, streamingEndpoint, MAX_URI_CHAR_LEN);
            pEndpointTracker->streamingEndpoint[MAX_URI_CHAR_LEN] = '\0';
            pEndpointTracker->endpointLastUpdateTime = pCurlRequest->requestInfo.currentTime;
        }

        pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlApiCallbacks->cachedEndpointsLock);
    }

    // Preserve the values as we need to free the request before the event notification
    if (pCurlRequest->pCurlResponse != NULL) {
        callResult = pCurlRequest->pCurlResponse->callInfo.callResult;
    }

    // Set the thread id just before returning
    pCurlRequest->threadId = INVALID_TID_VALUE;

    streamHandle = pCurlRequest->streamHandle;

    // Free the request object
    requestTerminating = ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating);
    curlApiCallbacksShutdownActiveRequests(pCurlRequest->pCurlApiCallbacks, pCurlRequest->streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, TRUE,
                                           FALSE);

    if (!requestTerminating) {
        retStatus = getStreamingEndpointResultEvent(streamHandle, callResult, streamingEndpoint);
        // Bubble the notification to potential listeners
        notifyCallResult(pCallbacksProvider, retStatus, streamHandle);
    }

    LEAVES();

    // Returning STATUS as PVOID casting first to ptr type to avoid compiler warnings on 64bit platforms.
    return (PVOID)(ULONG_PTR) retStatus;
}