in sample_apps_reinvent2021/java/src/main/java/com/amazonaws/services/timestream/Main.java [132:283]
public static void scheduledQueryExamples(CrudAndSimpleIngestionExample crudAndSimpleIngestionExample,
ScheduledQueryExample scheduledQueryExample,
QueryExample queryExample,
TimestreamDependencyHelper timestreamDependencyHelper,
String s3ErrorReportBucketName,
String region) {
Region regionIAM = Region.AWS_GLOBAL;
IamClient iamClient = IamClient.builder().region(regionIAM).build();
SnsClient snsClient = SnsClient.builder().region(Region.of(region)).build();
SqsClient sqsClient = SqsClient.builder().region(Region.of(region)).build();
String policyArn = null;
String subscriptionArn = null;
String topicArn = null;
String scheduledQueryArn = null;
try {
//Create database and table to store scheduled query results
crudAndSimpleIngestionExample.createDatabase(SQ_DATABASE_NAME);
crudAndSimpleIngestionExample.createTable(SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
//Create sns and sqs for scheduled query
topicArn = timestreamDependencyHelper.createSNSTopic(snsClient, TOPIC);
String queueUrl = timestreamDependencyHelper.createSQSQueue(sqsClient, QUEUE_NAME);
//Need to wait atleast 1s after creating queue to use it
wait(2);
String queue_arn = timestreamDependencyHelper.getQueueArn(sqsClient, queueUrl);
subscriptionArn = timestreamDependencyHelper.subscribeToSnsTopic(snsClient, topicArn, queue_arn);
timestreamDependencyHelper.setSqsAccessPolicy(sqsClient, queueUrl, topicArn, queue_arn);
String roleArn = TimestreamDependencyHelper.createIAMRole(iamClient, ROLE_NAME, region);
policyArn = TimestreamDependencyHelper.createIAMPolicy(iamClient, POLICY_NAME);
TimestreamDependencyHelper.attachIAMRolePolicy(iamClient, ROLE_NAME, policyArn);
//Waiting for newly created role to be active
System.out.println("Waiting 15 seconds for newly created role to become active");
wait(15);
//Scheduled Query Activities
/* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
scheduledQueryArn = scheduledQueryExample.createValidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
// scheduledQueryArn = scheduledQueryExample.createInvalidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
scheduledQueryExample.listScheduledQueries();
scheduledQueryExample.describeScheduledQueries(scheduledQueryArn);
// Sleep for 65 seconds to let ScheduledQuery run
System.out.println("Waiting 65 seconds for automatic ScheduledQuery executions & notifications");
Thread.sleep(65000);
boolean didQuerySucceedManually = false;
boolean wasQueryTriggeredAsExpected = false;
boolean queryFailed = false;
for (int i = 0; i < 10; i++) {
JsonObject response = timestreamDependencyHelper.receiveMessage(queueUrl, sqsClient);
System.out.println("Response: " + response);
if (response == null) {
continue;
}
// Since its a FIFO queue, we need to delete the message to be able to receive further messages
timestreamDependencyHelper.deleteMessage(queueUrl, sqsClient, response.get(RECEIPT_HANDLE).getAsString());
switch (ScheduledQueryExample.NotificationType.valueOf(response.get(NOTIFICATION_TYPE).getAsString())) {
case SCHEDULED_QUERY_CREATING:
// Scheduled Query is still pending to run.
// Fall-through
case SCHEDULED_QUERY_CREATED:
// Scheduled Query is still pending to run.
// Fall-through
case SCHEDULED_QUERY_DELETED:
// Fall-through
case SCHEDULED_QUERY_UPDATED:
// Fall-through
break;
case MANUAL_TRIGGER_SUCCESS:
System.out.println("Manual execution of Scheduled Query succeeded");
didQuerySucceedManually = true;
break;
case AUTO_TRIGGER_SUCCESS:
System.out.println("Scheduled Query was triggered as expected. Now triggering another run manually");
wasQueryTriggeredAsExpected = true;
scheduledQueryExample.executeScheduledQuery(scheduledQueryArn);
break;
case AUTO_TRIGGER_FAILURE:
// Fall-through
case MANUAL_TRIGGER_FAILURE:
queryFailed = true;
JsonObject scheduledQueryRunSummary =
response.get(MESSAGE).getAsJsonObject()
.get("scheduledQueryRunSummary").getAsJsonObject();
System.out.println("Failure Reason from SQS Notification:: " +
scheduledQueryRunSummary.get("failureReason").getAsString());
if (scheduledQueryRunSummary.has(ERROR_REPORT_LOCATION)) {
// Error Notification has Error report associated with it. We can parse it.
scheduledQueryExample.parseS3ErrorReport(s3Client, s3ErrorReportBucketName,
scheduledQueryRunSummary.get(ERROR_REPORT_LOCATION).getAsJsonObject()
.get(S3_REPORT_LOCATION).getAsJsonObject()
.get(OBJECT_KEY).getAsString());
}
break;
default:
System.out.println("Received an unexpected message:: " + response);
}
if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed) {
break;
}
}
if (wasQueryTriggeredAsExpected || didQuerySucceedManually) {
System.out.println("Fetching Scheduled Query execution results");
queryExample.runQuery("SELECT * FROM " + SQ_DATABASE_NAME + "." + SQ_TABLE_NAME);
}
scheduledQueryExample.updateScheduledQuery(scheduledQueryArn, ScheduledQueryState.DISABLED);
} catch (AwsServiceException e) {
System.out.println(e.awsErrorDetails().errorMessage());
throw e;
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
//Clean up for scheduled query
if (scheduledQueryArn != null) {
scheduledQueryExample.deleteScheduledQuery(scheduledQueryArn);
}
if (policyArn != null) {
TimestreamDependencyHelper.detachPolicy(iamClient, ROLE_NAME, policyArn);
TimestreamDependencyHelper.deleteIAMPolicy(iamClient, policyArn);
TimestreamDependencyHelper.deleteIAMRole(iamClient, ROLE_NAME);
}
if (subscriptionArn != null) {
timestreamDependencyHelper.unsubscribeFromSnsTopic(subscriptionArn, snsClient);
TimestreamDependencyHelper.deleteSQSQueue(sqsClient, QUEUE_NAME);
}
if (topicArn != null) {
timestreamDependencyHelper.deleteSNSTopic(snsClient, topicArn);
}
crudAndSimpleIngestionExample.deleteTable(SQ_TABLE_NAME, SQ_DATABASE_NAME);
crudAndSimpleIngestionExample.deleteDatabase(SQ_DATABASE_NAME);
}
}