in src/source/CurlApiCallbacks.c [1302:1463]
PVOID describeStreamCurlHandler(PVOID arg)
{
ENTERS();
STATUS retStatus = STATUS_SUCCESS;
PCurlRequest pCurlRequest = (PCurlRequest) arg;
PCurlApiCallbacks pCurlApiCallbacks = NULL;
PCallbacksProvider pCallbacksProvider = NULL;
PCurlResponse pCurlResponse = NULL;
PCHAR pResponseStr;
jsmn_parser parser;
jsmntok_t tokens[MAX_JSON_TOKEN_COUNT];
UINT32 i, strLen, resultLen;
INT32 tokenCount;
UINT64 retention;
BOOL jsonInStreamDescription = FALSE, requestTerminating = FALSE;
StreamDescription streamDescription;
STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
SERVICE_CALL_RESULT callResult = SERVICE_CALL_RESULT_NOT_SET;
CHECK(pCurlRequest != NULL && pCurlRequest->pCurlApiCallbacks != NULL && pCurlRequest->pCurlApiCallbacks->pCallbacksProvider != NULL &&
pCurlRequest->pCurlResponse != NULL);
pCurlApiCallbacks = pCurlRequest->pCurlApiCallbacks;
pCallbacksProvider = pCurlApiCallbacks->pCallbacksProvider;
// Acquire and release the startup lock to ensure the startup sequence is clear
pCallbacksProvider->clientCallbacks.lockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
pCallbacksProvider->clientCallbacks.unlockMutexFn(pCallbacksProvider->clientCallbacks.customData, pCurlRequest->startLock);
// Sign the request
CHK_STATUS(signAwsRequestInfo(&pCurlRequest->requestInfo));
// Wait for the specified amount of time before calling the API
if (pCurlRequest->requestInfo.currentTime < pCurlRequest->requestInfo.callAfter) {
THREAD_SLEEP(pCurlRequest->requestInfo.callAfter - pCurlRequest->requestInfo.currentTime);
}
// Execute the request
CHK_STATUS(curlCallApi(pCurlRequest));
CHK(!ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating), retStatus);
// Set the response object
pCurlResponse = pCurlRequest->pCurlResponse;
// Get the response
CHK(pCurlResponse->callInfo.callResult != SERVICE_CALL_RESULT_NOT_SET, STATUS_INVALID_OPERATION);
pResponseStr = pCurlResponse->callInfo.responseData;
resultLen = pCurlResponse->callInfo.responseDataLen;
DLOGD("DescribeStream API response: %.*s", resultLen, pResponseStr);
// skip json parsing if call result not ok
CHK(pCurlResponse->callInfo.callResult == SERVICE_CALL_RESULT_OK && resultLen != 0 && pResponseStr != NULL, retStatus);
// Parse the response
jsmn_init(&parser);
tokenCount = jsmn_parse(&parser, pResponseStr, resultLen, tokens, SIZEOF(tokens) / SIZEOF(jsmntok_t));
CHK(tokenCount > 1, STATUS_INVALID_API_CALL_RETURN_JSON);
CHK(tokens[0].type == JSMN_OBJECT, STATUS_INVALID_API_CALL_RETURN_JSON);
// Null out the fields before processing
MEMSET(&streamDescription, 0x00, SIZEOF(StreamDescription));
// Loop through the tokens and extract the stream description
for (i = 1; i < (UINT32) tokenCount; i++) {
if (!jsonInStreamDescription) {
if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamInfo")) {
streamDescription.version = STREAM_DESCRIPTION_CURRENT_VERSION;
jsonInStreamDescription = TRUE;
i++;
}
} else {
if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "DeviceName")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_DEVICE_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamDescription.deviceName, pResponseStr + tokens[i + 1].start, strLen);
streamDescription.deviceName[MAX_DEVICE_NAME_LEN] = '\0';
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "MediaType")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_CONTENT_TYPE_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamDescription.contentType, pResponseStr + tokens[i + 1].start, strLen);
streamDescription.contentType[MAX_CONTENT_TYPE_LEN] = '\0';
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "KmsKeyId")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_ARN_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamDescription.kmsKeyId, pResponseStr + tokens[i + 1].start, strLen);
streamDescription.kmsKeyId[MAX_ARN_LEN] = '\0';
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamARN")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_ARN_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamDescription.streamArn, pResponseStr + tokens[i + 1].start, strLen);
streamDescription.streamArn[MAX_ARN_LEN] = '\0';
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "StreamName")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_STREAM_NAME_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamDescription.streamName, pResponseStr + tokens[i + 1].start, strLen);
streamDescription.streamName[MAX_STREAM_NAME_LEN] = '\0';
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "Version")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_UPDATE_VERSION_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
STRNCPY(streamDescription.updateVersion, pResponseStr + tokens[i + 1].start, strLen);
streamDescription.updateVersion[MAX_UPDATE_VERSION_LEN] = '\0';
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "Status")) {
strLen = (UINT32)(tokens[i + 1].end - tokens[i + 1].start);
CHK(strLen <= MAX_DESCRIBE_STREAM_STATUS_LEN, STATUS_INVALID_API_CALL_RETURN_JSON);
streamDescription.streamStatus = getStreamStatusFromString(pResponseStr + tokens[i + 1].start, strLen);
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "DataRetentionInHours")) {
CHK_STATUS(STRTOUI64(pResponseStr + tokens[i + 1].start, pResponseStr + tokens[i + 1].end, 10, &retention));
// NOTE: Retention value is in hours
streamDescription.retention = retention * HUNDREDS_OF_NANOS_IN_AN_HOUR;
i++;
} else if (compareJsonString(pResponseStr, &tokens[i], JSMN_STRING, (PCHAR) "CreationTime")) {
// TODO: In the future parse out the creation time but currently we don't need it
i++;
}
}
}
CleanUp:
// Preserve the values as we need to free the request before the event notification
if (pCurlRequest->pCurlResponse != NULL) {
callResult = pCurlRequest->pCurlResponse->callInfo.callResult;
}
// Set the thread id just before returning
pCurlRequest->threadId = INVALID_TID_VALUE;
streamHandle = pCurlRequest->streamHandle;
// Free the request object
requestTerminating = ATOMIC_LOAD_BOOL(&pCurlRequest->requestInfo.terminating);
curlApiCallbacksShutdownActiveRequests(pCurlRequest->pCurlApiCallbacks, pCurlRequest->streamHandle, CURL_API_CALLBACKS_SHUTDOWN_TIMEOUT, TRUE,
FALSE);
if (!requestTerminating) {
if (callResult == SERVICE_CALL_RESULT_OK && !jsonInStreamDescription) {
// Notify PIC with the invalid result
describeStreamResultEvent(streamHandle, SERVICE_CALL_INVALID_ARG, &streamDescription);
// Overwrite the result with more precise info
retStatus = STATUS_INVALID_DESCRIBE_STREAM_RETURN_JSON;
} else {
// Notify PIC with the result
retStatus = describeStreamResultEvent(streamHandle, callResult, &streamDescription);
}
// Bubble the notification to potential listeners
notifyCallResult(pCallbacksProvider, retStatus, streamHandle);
}
LEAVES();
// Returning STATUS as PVOID casting first to ptr type to avoid compiler warnings on 64bit platforms.
return (PVOID)(ULONG_PTR) retStatus;
}