PVOID describeStreamCurlHandler()

in src/source/CurlApiCallbacks.c [1302:1463]


PVOID describeStreamCurlHandler(PVOID arg)
{
    ENTERS();
    STATUS retStatus = STATUS_SUCCESS;
    PCurlRequest pCurlRequest = (PCurlRequest) arg;
    PCurlApiCallbacks pCurlApiCallbacks = NULL;
    PCallbacksProvider pCallbacksProvider = NULL;
    PCurlResponse pCurlResponse = NULL;
    PCHAR pResponseStr;
    jsmn_parser parser;
    jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
    UINT32 i, strLen, resultLen;
    INT32 tokenCount;
    UINT64 retention;
    BOOL jsonInStreamDescription = FALSE, requestTerminating = FALSE;
    StreamDescription streamDescription;
    STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
    SERVICE_CALL_RESULT callResult = SERVICE_CALL_RESULT_NOT_SET;

    CHECK(pCurlRequest != NULL && pCurlRequest->pCurlApiCallbacks != NULL && pCurlRequest->pCurlApiCallbacks->pCallbacksProvider != NULL &&
          pCurlRequest->pCurlResponse != NULL);
    pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
    pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;

    // Acquire and release the startup lock to ensure the startup sequence is clear
    pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
    pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);

    // Sign the request
    CHK_STATUS(signAwsRequestInfo(&pCurlRequest->requestInfo));

    // Wait for the specified amount of time before calling the API
    if (pCurlRequest->requestInfo.currentTime < pCurlRequest->requestInfo.callAfter) {
        THREAD_SLEEP(pCurlRequest->requestInfo.callAfter - pCurlRequest->requestInfo.currentTime);
    }

    // Execute the request
    CHK_STATUS(curlCallApi(pCurlRequest));
    CHK(!ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating), retStatus);

    // Set the response object
    pCurlResponse = pCurlRequest->pCurlResponse;

    // Get the response
    CHK(pCurlResponse->callInfo.callResult != SERVICE_CALL_RESULT_NOT_SET, STATUS_INVALID_OPERATION);
    pResponseStr = pCurlResponse->callInfo.responseData;
    resultLen = pCurlResponse->callInfo.responseDataLen;

    DLOGD("DescribeStream API response: %.*s", resultLen, pResponseStr);

    // skip json parsing if call result not ok
    CHK(pCurlResponse->callInfo.callResult == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL, retStatus);

    // Parse the response
    jsmn_init(&parser);
    tokenCount = jsmn_parse(&parser, pResponseStr, resultLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
    CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
    CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);

    // Null out the fields before processing
    MEMSET(&streamDescription, 0x00, SIZEOF(StreamDescription));

    // Loop through the tokens and extract the stream description
    for (i = 1; i < (UINT32) tokenCount; i++) {
        if (!jsonInStreamDescription) {
            if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamInfo")) {
                streamDescription.version = STREAM_DESCRIPTION_CURRENT_VERSION;
                jsonInStreamDescription = TRUE;
                i++;
            }
        } else {
            if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "DeviceName")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_DEVICE_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                STRNCPY(streamDescription.deviceName, pResponseStr + tokens[i + 1].start, strLen);
                streamDescription.deviceName[MAX_DEVICE_NAME_LEN] = '\0';
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "MediaType")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_CONTENT_TYPE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                STRNCPY(streamDescription.contentType, pResponseStr + tokens[i + 1].start, strLen);
                streamDescription.contentType[MAX_CONTENT_TYPE_LEN] = '\0';
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "KmsKeyId")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_ARN_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                STRNCPY(streamDescription.kmsKeyId, pResponseStr + tokens[i + 1].start, strLen);
                streamDescription.kmsKeyId[MAX_ARN_LEN] = '\0';
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamARN")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_ARN_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                STRNCPY(streamDescription.streamArn, pResponseStr + tokens[i + 1].start, strLen);
                streamDescription.streamArn[MAX_ARN_LEN] = '\0';
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamName")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_STREAM_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                STRNCPY(streamDescription.streamName, pResponseStr + tokens[i + 1].start, strLen);
                streamDescription.streamName[MAX_STREAM_NAME_LEN] = '\0';
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "Version")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_UPDATE_VERSION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                STRNCPY(streamDescription.updateVersion, pResponseStr + tokens[i + 1].start, strLen);
                streamDescription.updateVersion[MAX_UPDATE_VERSION_LEN] = '\0';
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "Status")) {
                strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
                CHK(strLen <= MAX_DESCRIBE_STREAM_STATUS_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
                streamDescription.streamStatus = getStreamStatusFromString(pResponseStr + tokens[i + 1].start, strLen);
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "DataRetentionInHours")) {
                CHK_STATUS(STRTOUI64(pResponseStr + tokens[i + 1].start, pResponseStr + tokens[i + 1].end, 10, &retention));

                // NOTE: Retention value is in hours
                streamDescription.retention = retention * HUNDREDS_OF_NANOS_IN_AN_HOUR;
                i++;
            } else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "CreationTime")) {
                // TODO: In the future parse out the creation time but currently we don't need it
                i++;
            }
        }
    }

CleanUp:

    // Preserve the values as we need to free the request before the event notification
    if (pCurlRequest->pCurlResponse != NULL) {
        callResult = pCurlRequest->pCurlResponse->callInfo.callResult;
    }

    // Set the thread id just before returning
    pCurlRequest->threadId = INVALID_TID_VALUE;

    streamHandle = pCurlRequest->streamHandle;

    // Free the request object
    requestTerminating = ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating);
    curlApiCallbacksShutdownActiveRequests(pCurlRequest->pCurlApiCallbacks, pCurlRequest->streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, TRUE,
                                           FALSE);
    if (!requestTerminating) {
        if (callResult == SERVICE_CALL_RESULT_OK && !jsonInStreamDescription) {
            // Notify PIC with the invalid result
            describeStreamResultEvent(streamHandle, SERVICE_CALL_INVALID_ARG, &streamDescription);

            // Overwrite the result with more precise info
            retStatus = STATUS_INVALID_DESCRIBE_STREAM_RETURN_JSON;
        } else {
            // Notify PIC with the result
            retStatus = describeStreamResultEvent(streamHandle, callResult, &streamDescription);
        }

        // Bubble the notification to potential listeners
        notifyCallResult(pCallbacksProvider, retStatus, streamHandle);
    }

    LEAVES();

    // Returning STATUS as PVOID casting first to ptr type to avoid compiler warnings on 64bit platforms.
    return (PVOID)(ULONG_PTR) retStatus;
}