in sample_apps_reinvent2021/js/main.js [97:219]
async function scheduledQueryExamples() {
try {
await crudAndSimpleIngestionExample.createDatabase(constants.SQ_DATABASE_NAME);
await crudAndSimpleIngestionExample.createTable(constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);
var topicArn = await timestreamDependencyHelper.createSnsTopic();
var queueUrl = await timestreamDependencyHelper.createSqsQueue();
var queueArn = await timestreamDependencyHelper.getQueueArn(queueUrl);
var subscriptionArn = await timestreamDependencyHelper.subscribeToSnsTopic(topicArn, queueArn);
await timestreamDependencyHelper.setSqsAccessPolicy(queueUrl, topicArn, queueArn);
var roleArn = await timestreamDependencyHelper.createIamRole();
var policyArn = await timestreamDependencyHelper.createIAMPolicy();
await timestreamDependencyHelper.attachIAMRolePolicy(policyArn);
console.log("Waiting 15 seconds for newly created role to become active");
//Waiting for newly created role to be active
await new Promise(resolve => setTimeout(resolve, 15000));
var scheduledQueryArn = await scheduleQueryExample.createScheduledQuery(topicArn, roleArn, s3ErrorReportBucketName,constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);
//var scheduledQueryArn = await scheduleQueryExample.createInvalidScheduleQuery(topicArn, roleArn, s3ErrorReportBucketName, constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);
await scheduleQueryExample.listScheduledQueries();
await scheduleQueryExample.describeScheduledQuery(scheduledQueryArn);
console.log("Waiting 65 seconds for automatic ScheduledQuery executions & notifications");
await new Promise(resolve => setTimeout(resolve, 65000));
var didQuerySucceedManually = false;
var wasQueryTriggeredAsExpected = false;
var queryFailed = false;
var response = null;
for (let i = 0; i < 10; i++) {
response = await timestreamDependencyHelper.receiveMessage(queueUrl);
if (response === 'undefined') {
continue;
}
var recepitHandler = response.ReceiptHandle;
var bodyStr = response.Body;
// remove all escape double backslash in nested Json, so that json parser is able to work correctly
bodyStr = bodyStr.replace(/\\"/g, '"');
bodyStr = bodyStr.replace(/"{/g, '{');
bodyStr = bodyStr.replace(/}"/g, '}');
var bodyJson = JSON.parse(bodyStr);
var notificationType = bodyJson["MessageAttributes"]["notificationType"]["Value"];
// Since its a FIFO queue, we need to delete the message to be able to receive further messages
await timestreamDependencyHelper.deleteMessage(queueUrl, recepitHandler);
switch(notificationType) {
case 'SCHEDULED_QUERY_CREATING':
// Scheduled Query is still pending to run.
// Fall-through
case 'SCHEDULED_QUERY_CREATED':
console.log("Scheduled Query is still pending to run");
break;
case 'MANUAL_TRIGGER_SUCCESS':
console.log("Manual execution of Scheduled Query succeeded");
didQuerySucceedManually = true;
break;
case 'AUTO_TRIGGER_SUCCESS':
console.log("Scheduled Query was triggered as expected. Now triggering another run manually");
wasQueryTriggeredAsExpected = true;
await scheduleQueryExample.executeScheduledQuery(scheduledQueryArn, new Date());
break;
case 'AUTO_TRIGGER_FAILURE':
// Fall-through
case 'MANUAL_TRIGGER_FAILURE':
queryFailed = true;
var scheduledQueryRunSummary = bodyJson["Message"]["scheduledQueryRunSummary"];
console.log("Failure Reason from SQS Notification: " + scheduledQueryRunSummary["failureReason"]);
if (scheduledQueryRunSummary["errorReportLocation"] !== 'undefined') {
var errorReportPrefix = scheduledQueryRunSummary["errorReportLocation"]["s3ReportLocation"]["objectKey"];
await scheduleQueryExample.parseS3ErrorReport(s3ErrorReportBucketName, errorReportPrefix);
}
break;
case 'SCHEDULED_QUERY_DELETED':
// Fall-through
case 'SCHEDULED_QUERY_UPDATED':
// Fall-through
default:
console.log("Received an unexpected message: " + response);
}
if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed) {
break;
}
}
if (wasQueryTriggeredAsExpected || didQuerySucceedManually) {
console.log("Fetching Scheduled Query execution results");
var queryStr = `SELECT * FROM ${constants.SQ_DATABASE_NAME}.${constants.SQ_TABLE_NAME}`;
await queryExample.getAllRows(queryStr, null);
}
await scheduleQueryExample.updateScheduledQueries(scheduledQueryArn);
} catch (err) {
console.log(err);
}
finally {
await scheduleQueryExample.deleteScheduleQuery(scheduledQueryArn);
await timestreamDependencyHelper.detachPolicy(policyArn);
await timestreamDependencyHelper.deleteIAMPolicy(policyArn);
await timestreamDependencyHelper.deleteIamRole();
await timestreamDependencyHelper.deleteSqsQueue(queueUrl);
await timestreamDependencyHelper.unsubscribeFromSnsTopic(subscriptionArn);
await timestreamDependencyHelper.deleteSnsTopic(topicArn);
await timestreamDependencyHelper.clearBucket();
await timestreamDependencyHelper.deleteS3Bucket();
await crudAndSimpleIngestionExample.deleteTable(constants.SQ_DATABASE_NAME, constants.SQ_TABLE_NAME);
await crudAndSimpleIngestionExample.deleteDatabase(constants.SQ_DATABASE_NAME);
}
}