async function scheduledQueryExamples()

in sample_apps_reinvent2021/js/main.js [97:219]


async function scheduledQueryExamples() {
    try {
        await crudAndSimpleIngestionExample.createDatabase(constants.SQ_DATABASE_NAME);
        await crudAndSimpleIngestionExample.createTable(constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);

        var topicArn = await timestreamDependencyHelper.createSnsTopic();
        var queueUrl = await timestreamDependencyHelper.createSqsQueue();

        var queueArn = await timestreamDependencyHelper.getQueueArn(queueUrl);
        var subscriptionArn = await timestreamDependencyHelper.subscribeToSnsTopic(topicArn, queueArn);
        await timestreamDependencyHelper.setSqsAccessPolicy(queueUrl, topicArn, queueArn);

        
        var roleArn = await timestreamDependencyHelper.createIamRole();
        var policyArn = await timestreamDependencyHelper.createIAMPolicy();
        await timestreamDependencyHelper.attachIAMRolePolicy(policyArn);
        
        
        console.log("Waiting 15 seconds for newly created role to become active");
        //Waiting for newly created role to be active
        await new Promise(resolve => setTimeout(resolve, 15000));

        var scheduledQueryArn = await scheduleQueryExample.createScheduledQuery(topicArn, roleArn, s3ErrorReportBucketName,constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);
        //var scheduledQueryArn = await scheduleQueryExample.createInvalidScheduleQuery(topicArn, roleArn, s3ErrorReportBucketName, constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);

        await scheduleQueryExample.listScheduledQueries();
        await scheduleQueryExample.describeScheduledQuery(scheduledQueryArn);

        console.log("Waiting 65 seconds for automatic ScheduledQuery executions & notifications");
        await new Promise(resolve => setTimeout(resolve, 65000));

        var didQuerySucceedManually = false;
        var wasQueryTriggeredAsExpected = false;
        var queryFailed = false;

        var response = null;
        for (let i = 0; i < 10; i++) {
            response = await timestreamDependencyHelper.receiveMessage(queueUrl);
            if (response === 'undefined') {
                continue;
            }

            var recepitHandler = response.ReceiptHandle;
            var bodyStr = response.Body;
            
            // remove all escape double backslash in nested Json, so that json parser is able to work correctly
            bodyStr = bodyStr.replace(/\\"/g, '"');
            bodyStr = bodyStr.replace(/"{/g, '{');
            bodyStr = bodyStr.replace(/}"/g, '}');
            
            var bodyJson = JSON.parse(bodyStr);
            var notificationType = bodyJson["MessageAttributes"]["notificationType"]["Value"];

            // Since its a FIFO queue, we need to delete the message to be able to receive further messages
            await timestreamDependencyHelper.deleteMessage(queueUrl, recepitHandler);

            switch(notificationType) {
                case 'SCHEDULED_QUERY_CREATING':
                    // Scheduled Query is still pending to run.
                    // Fall-through
                case 'SCHEDULED_QUERY_CREATED':
                    console.log("Scheduled Query is still pending to run");
                    break;
                case 'MANUAL_TRIGGER_SUCCESS':
                    console.log("Manual execution of Scheduled Query succeeded");
                    didQuerySucceedManually = true;
                    break;
                case 'AUTO_TRIGGER_SUCCESS':
                    console.log("Scheduled Query was triggered as expected. Now triggering another run manually");
                    wasQueryTriggeredAsExpected = true;
                    await scheduleQueryExample.executeScheduledQuery(scheduledQueryArn, new Date());
                    break;
                case 'AUTO_TRIGGER_FAILURE':
                    // Fall-through
                case 'MANUAL_TRIGGER_FAILURE':
                    queryFailed = true;
                    var scheduledQueryRunSummary = bodyJson["Message"]["scheduledQueryRunSummary"];
                    
                    console.log("Failure Reason from SQS Notification: " + scheduledQueryRunSummary["failureReason"]);
                    if (scheduledQueryRunSummary["errorReportLocation"] !== 'undefined') {
                        var errorReportPrefix = scheduledQueryRunSummary["errorReportLocation"]["s3ReportLocation"]["objectKey"];
                        await scheduleQueryExample.parseS3ErrorReport(s3ErrorReportBucketName, errorReportPrefix);
                    }
                    break;
                case 'SCHEDULED_QUERY_DELETED':
                    // Fall-through
                case 'SCHEDULED_QUERY_UPDATED':
                    // Fall-through
                default:
                    console.log("Received an unexpected message: " + response);
            }


            if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed) {
                break;
            }
        }

        if (wasQueryTriggeredAsExpected || didQuerySucceedManually) {
            console.log("Fetching Scheduled Query execution results");
            var queryStr = `SELECT * FROM ${constants.SQ_DATABASE_NAME}.${constants.SQ_TABLE_NAME}`;
            await queryExample.getAllRows(queryStr, null);
        }

        await scheduleQueryExample.updateScheduledQueries(scheduledQueryArn);
    } catch (err) {
        console.log(err);
    }

    finally {
        await scheduleQueryExample.deleteScheduleQuery(scheduledQueryArn);
        await timestreamDependencyHelper.detachPolicy(policyArn);
        await timestreamDependencyHelper.deleteIAMPolicy(policyArn);
        await timestreamDependencyHelper.deleteIamRole();
        await timestreamDependencyHelper.deleteSqsQueue(queueUrl);
        await timestreamDependencyHelper.unsubscribeFromSnsTopic(subscriptionArn);
        await timestreamDependencyHelper.deleteSnsTopic(topicArn);
        await timestreamDependencyHelper.clearBucket();
        await timestreamDependencyHelper.deleteS3Bucket();
        await crudAndSimpleIngestionExample.deleteTable(constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);
        await crudAndSimpleIngestionExample.deleteDatabase(constants.SQ_DATABASE_NAME);
    }
}