in canary/producer-c/canary/KvsProducerSampleCloudwatch.cpp [289:563]
INT32 main(INT32 argc, CHAR* argv[])
{
#ifndef _WIN32
signal(SIGINT, sigintHandler);
#endif
SET_INSTRUMENTED_ALLOCATORS();
PDeviceInfo pDeviceInfo = NULL;
PStreamInfo pStreamInfo = NULL;
PClientCallbacks pClientCallbacks = NULL;
CLIENT_HANDLE clientHandle = INVALID_CLIENT_HANDLE_VALUE;
STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
STATUS retStatus = STATUS_SUCCESS;
PCHAR accessKey = NULL, secretKey = NULL, sessionToken = NULL, region = NULL, cacertPath = NULL, logLevel;
CHAR streamName[MAX_STREAM_NAME_LEN + 1];
Frame frame;
UINT32 frameIndex = 0, fileIndex = 0;
UINT64 fragmentSizeInByte = 0;
UINT64 lastKeyFrameTimestamp = 0;
CloudwatchLogsObject cloudwatchLogsObject;
PCanaryStreamCallbacks pCanaryStreamCallbacks = NULL;
UINT64 currentTime, canaryStopTime;
BOOL cleanUpDone = FALSE;
BOOL fileLoggingEnabled = FALSE;
PAuthCallbacks pAuthCallbacks = NULL;
CanaryConfig config;
BOOL firstFrame = TRUE;
UINT64 startTime;
DOUBLE startUpLatency;
UINT64 runTill = MAX_UINT64;
UINT64 randomTime = 0;
initializeEndianness();
SRAND(time(0));
Aws::SDKOptions options;
Aws::InitAPI(options);
{
frame.frameData = NULL;
if (argc < 2) {
DLOGW("Optional Usage: %s <path-to-config-file>\n", argv[0]);
DLOGD("Using environment variables now");
DLOGD("Usage pattern:\n"
"\t\texport CANARY_STREAM_NAME=<val>\n"
"\t\texport CANARY_STREAM_TYPE=<realtime/offline>\n"
"\t\texport FRAGMENT_SIZE_IN_BYTES=<Size of fragment in bytes>\n"
"\t\texport CANARY_DURATION_IN_SECONDS=<duration in seconds>"
"\t\texport CANARY_BUFFER_DURATION_IN_SECONDS=<duration in seconds>"
"\t\texport CANARY_STORAGE_SIZE_IN_BYTES=<storage size in bytes>"
"\t\texport CANARY_LABEL=<canary label (longtime,periodic, etc >"
"\t\texport CANARY_RUN_SCENARIO=<canary label (normal/intermittent) >");
CHK_STATUS(initWithEnvVars(&config));
} else {
CHK_ERR(STRLEN(argv[1]) < (MAX_PATH_LEN + 1), STATUS_INVALID_ARG_LEN, "File path length too long");
CHK_STATUS(parseConfigFile(&config, argv[1]));
}
MEMSET(streamName, '\0', SIZEOF(streamName));
cacertPath = getenv(CACERT_PATH_ENV_VAR);
sessionToken = getenv(SESSION_TOKEN_ENV_VAR);
if (config.useIotCredentialProvider) {
STRCPY(streamName, config.iotThingName);
} else {
SNPRINTF(streamName, MAX_STREAM_NAME_LEN, "%s-%s-%s", config.streamNamePrefix, config.canaryTypeStr, config.canaryLabel);
}
if ((region = getenv(DEFAULT_REGION_ENV_VAR)) == NULL) {
region = (PCHAR) DEFAULT_AWS_REGION;
}
Aws::Client::ClientConfiguration clientConfiguration;
clientConfiguration.region = region;
Aws::CloudWatch::CloudWatchClient cw(clientConfiguration);
Aws::CloudWatchLogs::CloudWatchLogsClient cwl(clientConfiguration);
STRCPY(cloudwatchLogsObject.logGroupName, "ProducerSDK");
SNPRINTF(cloudwatchLogsObject.logStreamName, MAX_LOG_FILE_NAME_LEN, "%s-log-%llu", streamName,
GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
cloudwatchLogsObject.pCwl = &cwl;
if ((retStatus = initializeCloudwatchLogger(&cloudwatchLogsObject)) != STATUS_SUCCESS) {
DLOGW("Cloudwatch logger failed to be initialized with 0x%08x error code. Fallback to file logging", retStatus);
fileLoggingEnabled = TRUE;
}
// default storage size is 128MB. Use setDeviceInfoStorageSize after create to change storage size.
CHK_STATUS(createDefaultDeviceInfo(&pDeviceInfo));
if (config.storageSizeInBytes != 0) {
CHK_STATUS(setDeviceInfoStorageSize(pDeviceInfo, config.storageSizeInBytes));
}
// adjust members of pDeviceInfo here if needed
pDeviceInfo->clientInfo.loggerLogLevel = LOG_LEVEL_DEBUG;
logLevel = getenv(DEBUG_LOG_LEVEL_ENV_VAR);
if (logLevel != NULL) {
STRTOUI32(logLevel, NULL, 10, &pDeviceInfo->clientInfo.loggerLogLevel);
}
// Run multitrack only for intermittent producer scenario
if (STRCMP(config.canaryTrackType, CANARY_MULTI_TRACK_TYPE) == 0) {
CHK_STATUS(createRealtimeAudioVideoStreamInfoProvider(streamName, DEFAULT_RETENTION_PERIOD, config.bufferDuration, &pStreamInfo));
} else {
CHK_STATUS(createRealtimeVideoStreamInfoProvider(streamName, DEFAULT_RETENTION_PERIOD, config.bufferDuration, &pStreamInfo));
}
adjustStreamInfoToCanaryType(pStreamInfo, config.canaryTypeStr);
// adjust members of pStreamInfo here if needed
pStreamInfo->streamCaps.nalAdaptationFlags = NAL_ADAPTATION_FLAG_NONE;
startTime = GETTIME();
CHK_STATUS(createAbstractDefaultCallbacksProvider(DEFAULT_CALLBACK_CHAIN_COUNT, API_CALL_CACHE_TYPE_NONE,
ENDPOINT_UPDATE_PERIOD_SENTINEL_VALUE, region, config.canaryCpUrl, cacertPath, NULL, NULL,
&pClientCallbacks));
if (config.useIotCredentialProvider) {
CHK_STATUS(createIotAuthCallbacks(pClientCallbacks, (PCHAR)config.iotEndpoint, config.iotCoreCert, config.iotCorePrivateKey, cacertPath, config.iotCoreRoleAlias,
streamName, &pAuthCallbacks));
} else {
if ((accessKey = getenv(ACCESS_KEY_ENV_VAR)) == NULL || (secretKey = getenv(SECRET_KEY_ENV_VAR)) == NULL) {
DLOGE("Error missing credentials");
CHK(FALSE, STATUS_INVALID_ARG);
}
CHK_STATUS(createStaticAuthCallbacks(pClientCallbacks, accessKey, secretKey, sessionToken, MAX_UINT64, &pAuthCallbacks));
}
PStreamCallbacks pStreamcallbacks = &pCanaryStreamCallbacks->streamCallbacks;
CHK_STATUS(createContinuousRetryStreamCallbacks(pClientCallbacks, &pStreamcallbacks));
if (getenv(CANARY_APP_FILE_LOGGER) != NULL || fileLoggingEnabled) {
if ((retStatus = addFileLoggerPlatformCallbacksProvider(pClientCallbacks, CANARY_FILE_LOGGING_BUFFER_SIZE, CANARY_MAX_NUMBER_OF_LOG_FILES,
(PCHAR) FILE_LOGGER_LOG_FILE_DIRECTORY_PATH, TRUE) != STATUS_SUCCESS)) {
DLOGE("File logging enable option failed with 0x%08x error code\n", retStatus);
fileLoggingEnabled = FALSE;
} else {
fileLoggingEnabled = TRUE;
}
}
CHK_STATUS(createCanaryStreamCallbacks(&cw, streamName, config.canaryLabel, &pCanaryStreamCallbacks));
CHK_STATUS(addStreamCallbacks(pClientCallbacks, &pCanaryStreamCallbacks->streamCallbacks));
if (!fileLoggingEnabled) {
pClientCallbacks->logPrintFn = cloudWatchLogger;
}
CHK_STATUS(createKinesisVideoClient(pDeviceInfo, pClientCallbacks, &clientHandle));
CHK_STATUS(createKinesisVideoStreamSync(clientHandle, pStreamInfo, &streamHandle));
// setup dummy frame
frame.size = CANARY_METADATA_SIZE + config.fragmentSizeInBytes / DEFAULT_FPS_VALUE;
frame.frameData = (PBYTE) MEMALLOC(frame.size);
CHK(frame.frameData != NULL, STATUS_NOT_ENOUGH_MEMORY);
frame.version = FRAME_CURRENT_VERSION;
frame.trackId = DEFAULT_VIDEO_TRACK_ID;
frame.duration = HUNDREDS_OF_NANOS_IN_A_MILLISECOND / DEFAULT_FPS_VALUE;
frame.decodingTs = GETTIME(); // current time
frame.presentationTs = frame.decodingTs;
currentTime = GETTIME();
canaryStopTime = currentTime + (config.canaryDuration * HUNDREDS_OF_NANOS_IN_A_SECOND);
UINT64 duration;
DLOGD("Producer SDK Log file name: %s", cloudwatchLogsObject.logStreamName);
printConfig(&config);
// Check if we have continuous run or intermittent scenario
if (STRCMP(config.canaryScenario, CANARY_INTERMITTENT_SCENARIO) == 0) {
// Set up runTill. This will be used if canary is run under intermittent scenario
randomTime = (RAND() % 10) + 1;
runTill = GETTIME() + randomTime * HUNDREDS_OF_NANOS_IN_A_MINUTE;
DLOGD("Intermittent run time is set to: %" PRIu64 " minutes", randomTime);
pCanaryStreamCallbacks->aggregateMetrics = FALSE;
}
// Say, the canary needs to be stopped before designated canary run time, signal capture
// must still be supported
while (GETTIME() < canaryStopTime && ATOMIC_LOAD_BOOL(&sigCaptureInterrupt) != TRUE) {
frame.index = frameIndex;
frame.flags = frameIndex % DEFAULT_KEY_FRAME_INTERVAL == 0 ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
createCanaryFrameData(&frame);
if (frame.flags == FRAME_FLAG_KEY_FRAME) {
if (lastKeyFrameTimestamp != 0) {
canaryStreamRecordFragmentEndSendTime(pCanaryStreamCallbacks, lastKeyFrameTimestamp, frame.presentationTs);
publishMetrics(streamHandle, clientHandle, pCanaryStreamCallbacks);
duration = GETTIME() - currentTime;
if ((!fileLoggingEnabled) && (duration > (60 * HUNDREDS_OF_NANOS_IN_A_SECOND))) {
canaryStreamSendLogs(&cloudwatchLogsObject);
currentTime = GETTIME();
retStatus = publishErrorRate(streamHandle, pCanaryStreamCallbacks, duration);
if (STATUS_FAILED(retStatus)) {
DLOGW("Could not publish error rate. Failed with %08x", retStatus);
}
}
}
lastKeyFrameTimestamp = frame.presentationTs;
}
if (GETTIME() < runTill) {
frame.trackId = DEFAULT_VIDEO_TRACK_ID;
CHK_STATUS(putKinesisVideoFrame(streamHandle, &frame));
// Send frame on another track only if we want to run multi track. For the sake of
// multitrack, we use the same frame for video and audio and just modify the flags.
if (STRCMP(config.canaryTrackType, CANARY_MULTI_TRACK_TYPE) == 0) {
frame.flags = FRAME_FLAG_NONE;
frame.trackId = DEFAULT_AUDIO_TRACK_ID;
CHK_STATUS(putKinesisVideoFrame(streamHandle, &frame));
}
THREAD_SLEEP(HUNDREDS_OF_NANOS_IN_A_SECOND / DEFAULT_FPS_VALUE);
} else {
canaryStreamRecordFragmentEndSendTime(pCanaryStreamCallbacks, lastKeyFrameTimestamp, frame.presentationTs);
DLOGD("Last frame type put before stopping: %s", (frame.flags == FRAME_FLAG_KEY_FRAME ? "Key Frame" : "Non key frame"));
UINT64 sleepTime = ((RAND() % 10) + 1) * HUNDREDS_OF_NANOS_IN_A_MINUTE;
DLOGD("Intermittent sleep time is set to: %" PRIu64 " minutes", sleepTime / HUNDREDS_OF_NANOS_IN_A_MINUTE);
THREAD_SLEEP(sleepTime);
// Reset runTill after 1 run of intermittent scenario
randomTime = (RAND() % 10) + 1;
DLOGD("Intermittent run time is set to: %" PRIu64 " minutes", randomTime);
runTill = GETTIME() + randomTime * HUNDREDS_OF_NANOS_IN_A_MINUTE;
}
// We measure this after first call to ensure that the latency is measured after the first SUCCESSFUL
// putKinesisVideoFrame() call
if (firstFrame) {
startUpLatency = (DOUBLE)(GETTIME() - startTime) / (DOUBLE) HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
CHK_STATUS(pushStartUpLatency(pCanaryStreamCallbacks, startUpLatency));
DLOGD("Start up latency: %lf ms", startUpLatency);
firstFrame = FALSE;
}
frame.decodingTs = GETTIME(); // current time
frame.presentationTs = frame.decodingTs;
frameIndex++;
}
CHK_LOG_ERR(retStatus);
SAFE_MEMFREE(frame.frameData);
freeDeviceInfo(&pDeviceInfo);
freeStreamInfoProvider(&pStreamInfo);
freeKinesisVideoStream(&streamHandle);
freeKinesisVideoClient(&clientHandle);
freeCallbacksProvider(&pClientCallbacks); // This will also take care of freeing canaryStreamCallbacks
RESET_INSTRUMENTED_ALLOCATORS();
DLOGI("CleanUp Done");
cleanUpDone = TRUE;
if (!fileLoggingEnabled) {
// This is necessary to ensure that we do not lose the last set of logs
canaryStreamSendLogSync(&cloudwatchLogsObject);
}
}
CleanUp:
Aws::ShutdownAPI(options);
CHK_LOG_ERR(retStatus);
// canaryStreamSendLogSync() will lead to segfault outside the block scope
// https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/basic-use.html
// The clean up related logs also need to be captured in cloudwatch logs. This flag
// will cater to two scenarios, one, if any of the commands fail, this clean up is invoked
// Second, it will cater to scenarios where a SIG is sent to exit the while loop above in
// which case the clean up related logs will be captured as well.
if (!cleanUpDone) {
CHK_LOG_ERR(retStatus);
SAFE_MEMFREE(frame.frameData);
freeDeviceInfo(&pDeviceInfo);
freeStreamInfoProvider(&pStreamInfo);
freeKinesisVideoStream(&streamHandle);
freeKinesisVideoClient(&clientHandle);
freeCallbacksProvider(&pClientCallbacks); // This will also take care of freeing canaryStreamCallbacks
RESET_INSTRUMENTED_ALLOCATORS();
DLOGI("CleanUp Done");
}
DLOGI("Exiting application with status code: 0x%08x", retStatus);
return STATUS_FAILED(retStatus) ? EXIT_FAILURE : EXIT_SUCCESS;
}