INT32 main()

in canary/producer-c/canary/KvsProducerSampleCloudwatch.cpp [289:563]


INT32 main(INT32 argc, CHAR* argv[])
{
#ifndef _WIN32
    signal(SIGINT, sigintHandler);
#endif
    SET_INSTRUMENTED_ALLOCATORS();
    PDeviceInfo pDeviceInfo = NULL;
    PStreamInfo pStreamInfo = NULL;

    PClientCallbacks pClientCallbacks = NULL;
    CLIENT_HANDLE clientHandle = INVALID_CLIENT_HANDLE_VALUE;
    STREAM_HANDLE streamHandle = INVALID_STREAM_HANDLE_VALUE;
    STATUS retStatus = STATUS_SUCCESS;
    PCHAR accessKey = NULL, secretKey = NULL, sessionToken = NULL, region = NULL, cacertPath = NULL, logLevel;
    CHAR streamName[MAX_STREAM_NAME_LEN + 1];
    Frame frame;
    UINT32 frameIndex = 0, fileIndex = 0;
    UINT64 fragmentSizeInByte = 0;
    UINT64 lastKeyFrameTimestamp = 0;
    CloudwatchLogsObject cloudwatchLogsObject;
    PCanaryStreamCallbacks pCanaryStreamCallbacks = NULL;
    UINT64 currentTime, canaryStopTime;
    BOOL cleanUpDone = FALSE;
    BOOL fileLoggingEnabled = FALSE;
    PAuthCallbacks pAuthCallbacks = NULL;
    CanaryConfig config;
    BOOL firstFrame = TRUE;
    UINT64 startTime;
    DOUBLE startUpLatency;
    UINT64 runTill = MAX_UINT64;
    UINT64 randomTime = 0;

    initializeEndianness();
    SRAND(time(0));
    Aws::SDKOptions options;
    Aws::InitAPI(options);
    {
        frame.frameData = NULL;

        if (argc < 2) {
            DLOGW("Optional Usage: %s <path-to-config-file>\n", argv[0]);
            DLOGD("Using environment variables now");
            DLOGD("Usage pattern:\n"
                  "\t\texport CANARY_STREAM_NAME=<val>\n"
                  "\t\texport CANARY_STREAM_TYPE=<realtime/offline>\n"
                  "\t\texport FRAGMENT_SIZE_IN_BYTES=<Size of fragment in bytes>\n"
                  "\t\texport CANARY_DURATION_IN_SECONDS=<duration in seconds>"
                  "\t\texport CANARY_BUFFER_DURATION_IN_SECONDS=<duration in seconds>"
                  "\t\texport CANARY_STORAGE_SIZE_IN_BYTES=<storage size in bytes>"
                  "\t\texport CANARY_LABEL=<canary label (longtime,periodic, etc >"
                  "\t\texport CANARY_RUN_SCENARIO=<canary label (normal/intermittent) >");
            CHK_STATUS(initWithEnvVars(&config));
        } else {
            CHK_ERR(STRLEN(argv[1]) < (MAX_PATH_LEN + 1), STATUS_INVALID_ARG_LEN, "File path length too long");
            CHK_STATUS(parseConfigFile(&config, argv[1]));
        }

        MEMSET(streamName, '\0', SIZEOF(streamName));
        cacertPath = getenv(CACERT_PATH_ENV_VAR);
        sessionToken = getenv(SESSION_TOKEN_ENV_VAR);

        if (config.useIotCredentialProvider) {
            STRCPY(streamName, config.iotThingName);
        } else {
            SNPRINTF(streamName, MAX_STREAM_NAME_LEN, "%s-%s-%s", config.streamNamePrefix, config.canaryTypeStr, config.canaryLabel);
        }

        if ((region = getenv(DEFAULT_REGION_ENV_VAR)) == NULL) {
            region = (PCHAR) DEFAULT_AWS_REGION;
        }

        Aws::Client::ClientConfiguration clientConfiguration;
        clientConfiguration.region = region;
        Aws::CloudWatch::CloudWatchClient cw(clientConfiguration);

        Aws::CloudWatchLogs::CloudWatchLogsClient cwl(clientConfiguration);

        STRCPY(cloudwatchLogsObject.logGroupName, "ProducerSDK");
        SNPRINTF(cloudwatchLogsObject.logStreamName, MAX_LOG_FILE_NAME_LEN, "%s-log-%llu", streamName,
                 GETTIME() / HUNDREDS_OF_NANOS_IN_A_MILLISECOND);
        cloudwatchLogsObject.pCwl = &cwl;
        if ((retStatus = initializeCloudwatchLogger(&cloudwatchLogsObject)) != STATUS_SUCCESS) {
            DLOGW("Cloudwatch logger failed to be initialized with 0x%08x error code. Fallback to file logging", retStatus);
            fileLoggingEnabled = TRUE;
        }

        // default storage size is 128MB. Use setDeviceInfoStorageSize after create to change storage size.
        CHK_STATUS(createDefaultDeviceInfo(&pDeviceInfo));

        if (config.storageSizeInBytes != 0) {
            CHK_STATUS(setDeviceInfoStorageSize(pDeviceInfo, config.storageSizeInBytes));
        }

        // adjust members of pDeviceInfo here if needed
        pDeviceInfo->clientInfo.loggerLogLevel = LOG_LEVEL_DEBUG;
        logLevel = getenv(DEBUG_LOG_LEVEL_ENV_VAR);
        if (logLevel != NULL) {
            STRTOUI32(logLevel, NULL, 10, &pDeviceInfo->clientInfo.loggerLogLevel);
        }

        // Run multitrack only for intermittent producer scenario
        if (STRCMP(config.canaryTrackType, CANARY_MULTI_TRACK_TYPE) == 0) {
            CHK_STATUS(createRealtimeAudioVideoStreamInfoProvider(streamName, DEFAULT_RETENTION_PERIOD, config.bufferDuration, &pStreamInfo));
        } else {
            CHK_STATUS(createRealtimeVideoStreamInfoProvider(streamName, DEFAULT_RETENTION_PERIOD, config.bufferDuration, &pStreamInfo));
        }
        adjustStreamInfoToCanaryType(pStreamInfo, config.canaryTypeStr);
        // adjust members of pStreamInfo here if needed
        pStreamInfo->streamCaps.nalAdaptationFlags = NAL_ADAPTATION_FLAG_NONE;

        startTime = GETTIME();
        CHK_STATUS(createAbstractDefaultCallbacksProvider(DEFAULT_CALLBACK_CHAIN_COUNT, API_CALL_CACHE_TYPE_NONE,
                                                          ENDPOINT_UPDATE_PERIOD_SENTINEL_VALUE, region, config.canaryCpUrl, cacertPath, NULL, NULL,
                                                          &pClientCallbacks));
        if (config.useIotCredentialProvider) {
            CHK_STATUS(createIotAuthCallbacks(pClientCallbacks, (PCHAR)config.iotEndpoint, config.iotCoreCert, config.iotCorePrivateKey, cacertPath, config.iotCoreRoleAlias,
                                              streamName, &pAuthCallbacks));

        } else {
            if ((accessKey = getenv(ACCESS_KEY_ENV_VAR)) == NULL || (secretKey = getenv(SECRET_KEY_ENV_VAR)) == NULL) {
                DLOGE("Error missing credentials");
                CHK(FALSE, STATUS_INVALID_ARG);
            }
            CHK_STATUS(createStaticAuthCallbacks(pClientCallbacks, accessKey, secretKey, sessionToken, MAX_UINT64, &pAuthCallbacks));
        }

        PStreamCallbacks pStreamcallbacks = &pCanaryStreamCallbacks->streamCallbacks;
        CHK_STATUS(createContinuousRetryStreamCallbacks(pClientCallbacks, &pStreamcallbacks));

        if (getenv(CANARY_APP_FILE_LOGGER) != NULL || fileLoggingEnabled) {
            if ((retStatus = addFileLoggerPlatformCallbacksProvider(pClientCallbacks, CANARY_FILE_LOGGING_BUFFER_SIZE, CANARY_MAX_NUMBER_OF_LOG_FILES,
                                                                    (PCHAR) FILE_LOGGER_LOG_FILE_DIRECTORY_PATH, TRUE) != STATUS_SUCCESS)) {
                DLOGE("File logging enable option failed with 0x%08x error code\n", retStatus);
                fileLoggingEnabled = FALSE;
            } else {
                fileLoggingEnabled = TRUE;
            }
        }

        CHK_STATUS(createCanaryStreamCallbacks(&cw, streamName, config.canaryLabel, &pCanaryStreamCallbacks));
        CHK_STATUS(addStreamCallbacks(pClientCallbacks, &pCanaryStreamCallbacks->streamCallbacks));

        if (!fileLoggingEnabled) {
            pClientCallbacks->logPrintFn = cloudWatchLogger;
        }

        CHK_STATUS(createKinesisVideoClient(pDeviceInfo, pClientCallbacks, &clientHandle));
        CHK_STATUS(createKinesisVideoStreamSync(clientHandle, pStreamInfo, &streamHandle));

        // setup dummy frame
        frame.size = CANARY_METADATA_SIZE + config.fragmentSizeInBytes / DEFAULT_FPS_VALUE;
        frame.frameData = (PBYTE) MEMALLOC(frame.size);
        CHK(frame.frameData != NULL, STATUS_NOT_ENOUGH_MEMORY);
        frame.version = FRAME_CURRENT_VERSION;
        frame.trackId = DEFAULT_VIDEO_TRACK_ID;
        frame.duration = HUNDREDS_OF_NANOS_IN_A_MILLISECOND / DEFAULT_FPS_VALUE;
        frame.decodingTs = GETTIME(); // current time
        frame.presentationTs = frame.decodingTs;
        currentTime = GETTIME();
        canaryStopTime = currentTime + (config.canaryDuration * HUNDREDS_OF_NANOS_IN_A_SECOND);
        UINT64 duration;

        DLOGD("Producer SDK Log file name: %s", cloudwatchLogsObject.logStreamName);

        printConfig(&config);

        // Check if we have continuous run or intermittent scenario
        if (STRCMP(config.canaryScenario, CANARY_INTERMITTENT_SCENARIO) == 0) {
            // Set up runTill. This will be used if canary is run under intermittent scenario
            randomTime = (RAND() % 10) + 1;
            runTill = GETTIME() + randomTime * HUNDREDS_OF_NANOS_IN_A_MINUTE;
            DLOGD("Intermittent run time is set to: %" PRIu64 " minutes", randomTime);
            pCanaryStreamCallbacks->aggregateMetrics = FALSE;
        }

        // Say, the canary needs to be stopped before designated canary run time, signal capture
        // must still be supported

        while (GETTIME() < canaryStopTime && ATOMIC_LOAD_BOOL(&sigCaptureInterrupt) != TRUE) {
            frame.index = frameIndex;
            frame.flags = frameIndex % DEFAULT_KEY_FRAME_INTERVAL == 0 ? FRAME_FLAG_KEY_FRAME : FRAME_FLAG_NONE;
            createCanaryFrameData(&frame);
            if (frame.flags == FRAME_FLAG_KEY_FRAME) {
                if (lastKeyFrameTimestamp != 0) {
                    canaryStreamRecordFragmentEndSendTime(pCanaryStreamCallbacks, lastKeyFrameTimestamp, frame.presentationTs);
                    publishMetrics(streamHandle, clientHandle, pCanaryStreamCallbacks);
                    duration = GETTIME() - currentTime;
                    if ((!fileLoggingEnabled) && (duration > (60 * HUNDREDS_OF_NANOS_IN_A_SECOND))) {
                        canaryStreamSendLogs(&cloudwatchLogsObject);
                        currentTime = GETTIME();
                        retStatus = publishErrorRate(streamHandle, pCanaryStreamCallbacks, duration);
                        if (STATUS_FAILED(retStatus)) {
                            DLOGW("Could not publish error rate. Failed with %08x", retStatus);
                        }
                    }
                }
                lastKeyFrameTimestamp = frame.presentationTs;
            }

            if (GETTIME() < runTill) {
                frame.trackId = DEFAULT_VIDEO_TRACK_ID;
                CHK_STATUS(putKinesisVideoFrame(streamHandle, &frame));

                // Send frame on another track only if we want to run multi track. For the sake of
                // multitrack, we use the same frame for video and audio and just modify the flags.
                if (STRCMP(config.canaryTrackType, CANARY_MULTI_TRACK_TYPE) == 0) {
                    frame.flags = FRAME_FLAG_NONE;
                    frame.trackId = DEFAULT_AUDIO_TRACK_ID;
                    CHK_STATUS(putKinesisVideoFrame(streamHandle, &frame));
                }
                THREAD_SLEEP(HUNDREDS_OF_NANOS_IN_A_SECOND / DEFAULT_FPS_VALUE);
            } else {
                canaryStreamRecordFragmentEndSendTime(pCanaryStreamCallbacks, lastKeyFrameTimestamp, frame.presentationTs);
                DLOGD("Last frame type put before stopping: %s", (frame.flags == FRAME_FLAG_KEY_FRAME ? "Key Frame" : "Non key frame"));
                UINT64 sleepTime = ((RAND() % 10) + 1) * HUNDREDS_OF_NANOS_IN_A_MINUTE;
                DLOGD("Intermittent sleep time is set to: %" PRIu64 " minutes", sleepTime / HUNDREDS_OF_NANOS_IN_A_MINUTE);
                THREAD_SLEEP(sleepTime);
                // Reset runTill after 1 run of intermittent scenario
                randomTime = (RAND() % 10) + 1;
                DLOGD("Intermittent run time is set to: %" PRIu64 " minutes", randomTime);
                runTill = GETTIME() + randomTime * HUNDREDS_OF_NANOS_IN_A_MINUTE;
            }
            // We measure this after first call to ensure that the latency is measured after the first SUCCESSFUL
            // putKinesisVideoFrame() call
            if (firstFrame) {
                startUpLatency = (DOUBLE)(GETTIME() - startTime) / (DOUBLE) HUNDREDS_OF_NANOS_IN_A_MILLISECOND;
                CHK_STATUS(pushStartUpLatency(pCanaryStreamCallbacks, startUpLatency));
                DLOGD("Start up latency: %lf ms", startUpLatency);
                firstFrame = FALSE;
            }

            frame.decodingTs = GETTIME(); // current time
            frame.presentationTs = frame.decodingTs;
            frameIndex++;
        }
        CHK_LOG_ERR(retStatus);
        SAFE_MEMFREE(frame.frameData);
        freeDeviceInfo(&pDeviceInfo);
        freeStreamInfoProvider(&pStreamInfo);
        freeKinesisVideoStream(&streamHandle);
        freeKinesisVideoClient(&clientHandle);
        freeCallbacksProvider(&pClientCallbacks); // This will also take care of freeing canaryStreamCallbacks
        RESET_INSTRUMENTED_ALLOCATORS();
        DLOGI("CleanUp Done");
        cleanUpDone = TRUE;
        if (!fileLoggingEnabled) {
            // This is necessary to ensure that we do not lose the last set of logs
            canaryStreamSendLogSync(&cloudwatchLogsObject);
        }
    }
CleanUp:
    Aws::ShutdownAPI(options);
    CHK_LOG_ERR(retStatus);

    // canaryStreamSendLogSync() will lead to segfault outside the block scope
    // https://docs.aws.amazon.com/sdk-for-cpp/v1/developer-guide/basic-use.html
    // The clean up related logs also need to be captured in cloudwatch logs. This flag
    // will cater to two scenarios, one, if any of the commands fail, this clean up is invoked
    // Second, it will cater to scenarios where a SIG is sent to exit the while loop above in
    // which case the clean up related logs will be captured as well.
    if (!cleanUpDone) {
        CHK_LOG_ERR(retStatus);
        SAFE_MEMFREE(frame.frameData);

        freeDeviceInfo(&pDeviceInfo);
        freeStreamInfoProvider(&pStreamInfo);
        freeKinesisVideoStream(&streamHandle);
        freeKinesisVideoClient(&clientHandle);
        freeCallbacksProvider(&pClientCallbacks); // This will also take care of freeing canaryStreamCallbacks
        RESET_INSTRUMENTED_ALLOCATORS();
        DLOGI("CleanUp Done");
    }
    DLOGI("Exiting application with status code: 0x%08x", retStatus);
    return STATUS_FAILED(retStatus) ? EXIT_FAILURE : EXIT_SUCCESS;
}