in sample_apps_reinvent2021/javaV2/src/main/java/com/amazonaws/services/timestream/Main.java [81:239]
private static void scheduledQueryExamples(CrudAndSimpleIngestionExample crudAndSimpleIngestionExample,
TimestreamQueryClient queryClient, QueryExample queryExample, S3Client s3Client, String region) {
TimestreamDependencyHelper timestreamDependencyHelper = new TimestreamDependencyHelper();
ScheduledQueryExample scheduledQueryExample = new ScheduledQueryExample(queryClient);
Region regionIAM = Region.AWS_GLOBAL;
IamClient iamClient = IamClient.builder().region(regionIAM).build();
SnsClient snsClient = SnsClient.builder().region(Region.of(region)).build();
SqsClient sqsClient = SqsClient.builder().region(Region.of(region)).build();
String policyArn = null;
String subscriptionArn = null;
String topicArn = null;
String scheduledQueryArn = null;
String s3ErrorReportBucketName = null;
try {
//Create database and table to store scheduled query results
crudAndSimpleIngestionExample.createDatabase(SQ_DATABASE_NAME);
crudAndSimpleIngestionExample.createTable(SQ_DATABASE_NAME, SQ_TABLE_NAME);
//Create sns and sqs for scheduled query
topicArn = timestreamDependencyHelper.createSNSTopic(snsClient, TOPIC);
String queueUrl = timestreamDependencyHelper.createSQSQueue(sqsClient, QUEUE_NAME);
String sqsQueueArn = timestreamDependencyHelper.getQueueArn(sqsClient, queueUrl);
subscriptionArn = timestreamDependencyHelper.subscribeToSnsTopic(snsClient, topicArn, sqsQueueArn);
timestreamDependencyHelper.setSqsAccessPolicy(sqsClient, queueUrl, topicArn, sqsQueueArn);
String roleArn = timestreamDependencyHelper.createIAMRole(iamClient, ROLE_NAME, region);
policyArn = timestreamDependencyHelper.createIAMPolicy(iamClient, POLICY_NAME);
timestreamDependencyHelper.attachIAMRolePolicy(iamClient, ROLE_NAME, policyArn);
// Wait at-least 15s for newly created role to be active
System.out.println("Waiting 15 seconds for newly created role to become active");
Thread.sleep(15000);
// Make the bucket name unique by appending 5 random characters at the end
s3ErrorReportBucketName =
timestreamDependencyHelper.createS3Bucket(s3Client, ERROR_CONFIGURATION_S3_BUCKET_NAME_PREFIX +
RandomStringUtils.randomAlphanumeric(5).toLowerCase());
//Scheduled Query Activities
/* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
scheduledQueryArn = scheduledQueryExample.createValidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
//scheduledQueryArn = scheduledQueryExample.createInvalidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
scheduledQueryExample.listScheduledQueries();
scheduledQueryExample.describeScheduledQueries(scheduledQueryArn);
// Sleep for 65 seconds to let ScheduledQuery run
System.out.println("Waiting 65 seconds for automatic ScheduledQuery executions & notifications");
Thread.sleep(65000);
boolean didQuerySucceedManually = false;
boolean wasQueryTriggeredAsExpected = false;
boolean queryFailed = false;
for (int i = 0; i < 10; i++) {
JsonObject response = timestreamDependencyHelper.receiveMessage(queueUrl, sqsClient);
System.out.println(response);
if (response == null) {
continue;
}
// Since its a FIFO queue, we need to delete the message to be able to receive further messages
timestreamDependencyHelper.deleteMessage(queueUrl, sqsClient, response.get(RECEIPT_HANDLE).getAsString());
switch (NotificationType.valueOf(response.get(NOTIFICATION_TYPE).getAsString())) {
case SCHEDULED_QUERY_CREATING:
// Scheduled Query is still pending to run.
// Fall-through
case SCHEDULED_QUERY_CREATED:
// Scheduled Query is still pending to run.
break;
case MANUAL_TRIGGER_SUCCESS:
System.out.println("Manual execution of Scheduled Query succeeded");
didQuerySucceedManually = true;
break;
case AUTO_TRIGGER_SUCCESS:
System.out.println("Scheduled Query was triggered as expected. Now triggering another run manually");
wasQueryTriggeredAsExpected = true;
scheduledQueryExample.executeScheduledQuery(scheduledQueryArn);
break;
case AUTO_TRIGGER_FAILURE:
// Fall-through
case MANUAL_TRIGGER_FAILURE:
queryFailed = true;
JsonObject scheduledQueryRunSummary =
response.get(MESSAGE).getAsJsonObject()
.get("scheduledQueryRunSummary").getAsJsonObject();
System.out.println("Failure Reason from SQS Notification:: " +
scheduledQueryRunSummary.get("failureReason").getAsString());
if (scheduledQueryRunSummary.has(ERROR_REPORT_LOCATION)) {
// Error Notification has Error report associated with it. We can parse it.
scheduledQueryExample.parseS3ErrorReport(s3Client, s3ErrorReportBucketName,
scheduledQueryRunSummary.get(ERROR_REPORT_LOCATION).getAsJsonObject()
.get(S3_REPORT_LOCATION).getAsJsonObject()
.get(OBJECT_KEY).getAsString());
}
break;
case SCHEDULED_QUERY_DELETED:
// Fall-through
case SCHEDULED_QUERY_UPDATED:
// Fall-through
default:
System.out.println("Received an unexpected message:: " + response);
}
if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed) {
break;
}
}
if (wasQueryTriggeredAsExpected || didQuerySucceedManually) {
System.out.println("Fetching Scheduled Query execution results");
queryExample.runQuery("SELECT * FROM " + SQ_DATABASE_NAME + "." + SQ_TABLE_NAME);
}
scheduledQueryExample.updateScheduledQuery(scheduledQueryArn, ScheduledQueryState.DISABLED);
} catch (AwsServiceException e) {
System.out.println(e.awsErrorDetails().errorMessage());
throw e;
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
//Clean up for scheduled query
if (scheduledQueryArn != null) {
scheduledQueryExample.deleteScheduledQuery(scheduledQueryArn);
}
if (policyArn != null) {
timestreamDependencyHelper.detachPolicy(iamClient, ROLE_NAME, policyArn);
timestreamDependencyHelper.deleteIAMPolicy(iamClient, policyArn);
timestreamDependencyHelper.deleteIAMRole(iamClient, ROLE_NAME);
}
if (subscriptionArn != null) {
timestreamDependencyHelper.unsubscribeFromSnsTopic(subscriptionArn, snsClient);
timestreamDependencyHelper.deleteSQSQueue(sqsClient, QUEUE_NAME);
}
if (topicArn != null) {
timestreamDependencyHelper.deleteSNSTopic(snsClient, topicArn);
}
if (s3ErrorReportBucketName != null) {
timestreamDependencyHelper.deleteS3Bucket(s3Client, s3ErrorReportBucketName);
}
crudAndSimpleIngestionExample.deleteTable(SQ_TABLE_NAME, SQ_DATABASE_NAME);
crudAndSimpleIngestionExample.deleteDatabase(SQ_DATABASE_NAME);
}
}