public static void scheduledQueryExamples()

in sample_apps_reinvent2021/java/src/main/java/com/amazonaws/services/timestream/Main.java [132:283]


    public static void scheduledQueryExamples(CrudAndSimpleIngestionExample crudAndSimpleIngestionExample,
            ScheduledQueryExample scheduledQueryExample,
            QueryExample queryExample,
            TimestreamDependencyHelper timestreamDependencyHelper,
            String s3ErrorReportBucketName,
            String region) {
        Region regionIAM = Region.AWS_GLOBAL;
        IamClient iamClient = IamClient.builder().region(regionIAM).build();
        SnsClient snsClient = SnsClient.builder().region(Region.of(region)).build();
        SqsClient sqsClient = SqsClient.builder().region(Region.of(region)).build();
        String policyArn = null;
        String subscriptionArn = null;
        String topicArn = null;
        String scheduledQueryArn = null;

        try {
            //Create database and table to store scheduled query results
            crudAndSimpleIngestionExample.createDatabase(SQ_DATABASE_NAME);
            crudAndSimpleIngestionExample.createTable(SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);

            //Create sns and sqs for scheduled query
            topicArn = timestreamDependencyHelper.createSNSTopic(snsClient, TOPIC);
            String queueUrl = timestreamDependencyHelper.createSQSQueue(sqsClient, QUEUE_NAME);
            //Need to wait atleast 1s after creating queue to use it
            wait(2);

            String queue_arn = timestreamDependencyHelper.getQueueArn(sqsClient, queueUrl);

            subscriptionArn = timestreamDependencyHelper.subscribeToSnsTopic(snsClient, topicArn, queue_arn);
            timestreamDependencyHelper.setSqsAccessPolicy(sqsClient, queueUrl, topicArn, queue_arn);

            String roleArn = TimestreamDependencyHelper.createIAMRole(iamClient, ROLE_NAME, region);
            policyArn = TimestreamDependencyHelper.createIAMPolicy(iamClient, POLICY_NAME);
            TimestreamDependencyHelper.attachIAMRolePolicy(iamClient, ROLE_NAME, policyArn);

            //Waiting for newly created role to be active
            System.out.println("Waiting 15 seconds for newly created role to become active");
            wait(15);

            //Scheduled Query Activities

            /* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
            scheduledQueryArn = scheduledQueryExample.createValidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
            // scheduledQueryArn = scheduledQueryExample.createInvalidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);

            scheduledQueryExample.listScheduledQueries();
            scheduledQueryExample.describeScheduledQueries(scheduledQueryArn);

            // Sleep for 65 seconds to let ScheduledQuery run
            System.out.println("Waiting 65 seconds for automatic ScheduledQuery executions & notifications");
            Thread.sleep(65000);

            boolean didQuerySucceedManually = false;
            boolean wasQueryTriggeredAsExpected = false;
            boolean queryFailed = false;

            for (int i = 0; i < 10; i++) {
                JsonObject response = timestreamDependencyHelper.receiveMessage(queueUrl, sqsClient);
                System.out.println("Response: " + response);
                if (response == null) {
                    continue;
                }

                // Since its a FIFO queue, we need to delete the message to be able to receive further messages
                timestreamDependencyHelper.deleteMessage(queueUrl, sqsClient, response.get(RECEIPT_HANDLE).getAsString());

                switch (ScheduledQueryExample.NotificationType.valueOf(response.get(NOTIFICATION_TYPE).getAsString())) {
                    case SCHEDULED_QUERY_CREATING:
                        // Scheduled Query is still pending to run.
                        // Fall-through
                    case SCHEDULED_QUERY_CREATED:
                        // Scheduled Query is still pending to run.
                        // Fall-through
                    case SCHEDULED_QUERY_DELETED:
                        // Fall-through
                    case SCHEDULED_QUERY_UPDATED:
                        // Fall-through
                        break;
                    case MANUAL_TRIGGER_SUCCESS:
                        System.out.println("Manual execution of Scheduled Query succeeded");
                        didQuerySucceedManually = true;
                        break;
                    case AUTO_TRIGGER_SUCCESS:
                        System.out.println("Scheduled Query was triggered as expected. Now triggering another run manually");
                        wasQueryTriggeredAsExpected = true;
                        scheduledQueryExample.executeScheduledQuery(scheduledQueryArn);
                        break;
                    case AUTO_TRIGGER_FAILURE:
                        // Fall-through
                    case MANUAL_TRIGGER_FAILURE:
                        queryFailed = true;
                        JsonObject scheduledQueryRunSummary =
                                response.get(MESSAGE).getAsJsonObject()
                                        .get("scheduledQueryRunSummary").getAsJsonObject();

                        System.out.println("Failure Reason from SQS Notification:: " +
                                scheduledQueryRunSummary.get("failureReason").getAsString());

                        if (scheduledQueryRunSummary.has(ERROR_REPORT_LOCATION)) {
                            // Error Notification has Error report associated with it. We can parse it.
                            scheduledQueryExample.parseS3ErrorReport(s3Client, s3ErrorReportBucketName,
                                    scheduledQueryRunSummary.get(ERROR_REPORT_LOCATION).getAsJsonObject()
                                            .get(S3_REPORT_LOCATION).getAsJsonObject()
                                            .get(OBJECT_KEY).getAsString());
                        }
                        break;
                    default:
                        System.out.println("Received an unexpected message:: " + response);
                }

                if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed) {
                    break;
                }
            }

            if (wasQueryTriggeredAsExpected || didQuerySucceedManually) {
                System.out.println("Fetching Scheduled Query execution results");
                queryExample.runQuery("SELECT * FROM " + SQ_DATABASE_NAME + "." + SQ_TABLE_NAME);
            }

            scheduledQueryExample.updateScheduledQuery(scheduledQueryArn, ScheduledQueryState.DISABLED);
        } catch (AwsServiceException e) {
            System.out.println(e.awsErrorDetails().errorMessage());
            throw e;
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }  finally {
            //Clean up for scheduled query
            if (scheduledQueryArn != null) {
                scheduledQueryExample.deleteScheduledQuery(scheduledQueryArn);
            }

            if (policyArn != null) {
                TimestreamDependencyHelper.detachPolicy(iamClient, ROLE_NAME, policyArn);
                TimestreamDependencyHelper.deleteIAMPolicy(iamClient, policyArn);
                TimestreamDependencyHelper.deleteIAMRole(iamClient, ROLE_NAME);
            }

            if (subscriptionArn != null) {
                timestreamDependencyHelper.unsubscribeFromSnsTopic(subscriptionArn, snsClient);
                TimestreamDependencyHelper.deleteSQSQueue(sqsClient, QUEUE_NAME);
            }


            if (topicArn != null) {
                timestreamDependencyHelper.deleteSNSTopic(snsClient, topicArn);
            }

            crudAndSimpleIngestionExample.deleteTable(SQ_TABLE_NAME, SQ_DATABASE_NAME);
            crudAndSimpleIngestionExample.deleteDatabase(SQ_DATABASE_NAME);
        }
    }