private static void scheduledQueryExamples()

in sample_apps_reinvent2021/javaV2/src/main/java/com/amazonaws/services/timestream/Main.java [81:239]


    private static void scheduledQueryExamples(CrudAndSimpleIngestionExample crudAndSimpleIngestionExample,
            TimestreamQueryClient queryClient, QueryExample queryExample, S3Client s3Client, String region) {

        TimestreamDependencyHelper timestreamDependencyHelper = new TimestreamDependencyHelper();
        ScheduledQueryExample scheduledQueryExample = new ScheduledQueryExample(queryClient);

        Region regionIAM = Region.AWS_GLOBAL;
        IamClient iamClient = IamClient.builder().region(regionIAM).build();
        SnsClient snsClient = SnsClient.builder().region(Region.of(region)).build();
        SqsClient sqsClient = SqsClient.builder().region(Region.of(region)).build();
        String policyArn = null;
        String subscriptionArn = null;
        String topicArn = null;
        String scheduledQueryArn = null;
        String s3ErrorReportBucketName = null;

        try {

            //Create database and table to store scheduled query results
            crudAndSimpleIngestionExample.createDatabase(SQ_DATABASE_NAME);
            crudAndSimpleIngestionExample.createTable(SQ_DATABASE_NAME, SQ_TABLE_NAME);

            //Create sns and sqs for scheduled query
            topicArn = timestreamDependencyHelper.createSNSTopic(snsClient, TOPIC);
            String queueUrl = timestreamDependencyHelper.createSQSQueue(sqsClient, QUEUE_NAME);

            String sqsQueueArn = timestreamDependencyHelper.getQueueArn(sqsClient, queueUrl);

            subscriptionArn = timestreamDependencyHelper.subscribeToSnsTopic(snsClient, topicArn, sqsQueueArn);
            timestreamDependencyHelper.setSqsAccessPolicy(sqsClient, queueUrl, topicArn, sqsQueueArn);

            String roleArn = timestreamDependencyHelper.createIAMRole(iamClient, ROLE_NAME, region);
            policyArn = timestreamDependencyHelper.createIAMPolicy(iamClient, POLICY_NAME);
            timestreamDependencyHelper.attachIAMRolePolicy(iamClient, ROLE_NAME, policyArn);

            // Wait at-least 15s for newly created role to be active
            System.out.println("Waiting 15 seconds for newly created role to become active");
            Thread.sleep(15000);

            // Make the bucket name unique by appending 5 random characters at the end
            s3ErrorReportBucketName =
                    timestreamDependencyHelper.createS3Bucket(s3Client, ERROR_CONFIGURATION_S3_BUCKET_NAME_PREFIX +
                            RandomStringUtils.randomAlphanumeric(5).toLowerCase());

            //Scheduled Query Activities

            /* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
            scheduledQueryArn = scheduledQueryExample.createValidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);
            //scheduledQueryArn = scheduledQueryExample.createInvalidScheduledQuery(topicArn, roleArn, SQ_DATABASE_NAME, SQ_TABLE_NAME, s3ErrorReportBucketName);

            scheduledQueryExample.listScheduledQueries();
            scheduledQueryExample.describeScheduledQueries(scheduledQueryArn);

            // Sleep for 65 seconds to let ScheduledQuery run
            System.out.println("Waiting 65 seconds for automatic ScheduledQuery executions & notifications");
            Thread.sleep(65000);

            boolean didQuerySucceedManually = false;
            boolean wasQueryTriggeredAsExpected = false;
            boolean queryFailed = false;
            for (int i = 0; i < 10; i++) {
                JsonObject response = timestreamDependencyHelper.receiveMessage(queueUrl, sqsClient);
                System.out.println(response);
                if (response == null) {
                    continue;
                }

                // Since its a FIFO queue, we need to delete the message to be able to receive further messages
                timestreamDependencyHelper.deleteMessage(queueUrl, sqsClient, response.get(RECEIPT_HANDLE).getAsString());

                switch (NotificationType.valueOf(response.get(NOTIFICATION_TYPE).getAsString())) {
                    case SCHEDULED_QUERY_CREATING:
                        // Scheduled Query is still pending to run.
                        // Fall-through
                    case SCHEDULED_QUERY_CREATED:
                        // Scheduled Query is still pending to run.
                        break;
                    case MANUAL_TRIGGER_SUCCESS:
                        System.out.println("Manual execution of Scheduled Query succeeded");
                        didQuerySucceedManually = true;
                        break;
                    case AUTO_TRIGGER_SUCCESS:
                        System.out.println("Scheduled Query was triggered as expected. Now triggering another run manually");
                        wasQueryTriggeredAsExpected = true;
                        scheduledQueryExample.executeScheduledQuery(scheduledQueryArn);
                        break;
                    case AUTO_TRIGGER_FAILURE:
                        // Fall-through
                    case MANUAL_TRIGGER_FAILURE:
                        queryFailed = true;
                        JsonObject scheduledQueryRunSummary =
                                response.get(MESSAGE).getAsJsonObject()
                                        .get("scheduledQueryRunSummary").getAsJsonObject();

                        System.out.println("Failure Reason from SQS Notification:: " +
                                scheduledQueryRunSummary.get("failureReason").getAsString());

                        if (scheduledQueryRunSummary.has(ERROR_REPORT_LOCATION)) {
                            // Error Notification has Error report associated with it. We can parse it.
                            scheduledQueryExample.parseS3ErrorReport(s3Client, s3ErrorReportBucketName,
                                    scheduledQueryRunSummary.get(ERROR_REPORT_LOCATION).getAsJsonObject()
                                            .get(S3_REPORT_LOCATION).getAsJsonObject()
                                            .get(OBJECT_KEY).getAsString());
                        }
                        break;
                    case SCHEDULED_QUERY_DELETED:
                        // Fall-through
                    case SCHEDULED_QUERY_UPDATED:
                        // Fall-through
                    default:
                        System.out.println("Received an unexpected message:: " + response);
                }

                if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed) {
                    break;
                }
            }

            if (wasQueryTriggeredAsExpected || didQuerySucceedManually) {
                System.out.println("Fetching Scheduled Query execution results");
                queryExample.runQuery("SELECT * FROM " + SQ_DATABASE_NAME + "." + SQ_TABLE_NAME);
            }

            scheduledQueryExample.updateScheduledQuery(scheduledQueryArn, ScheduledQueryState.DISABLED);
        } catch (AwsServiceException e) {
            System.out.println(e.awsErrorDetails().errorMessage());
            throw e;
        } catch (Exception e) {
            System.out.println(e.getMessage());
        } finally {
            //Clean up for scheduled query
            if (scheduledQueryArn != null) {
                scheduledQueryExample.deleteScheduledQuery(scheduledQueryArn);
            }

            if (policyArn != null) {
                timestreamDependencyHelper.detachPolicy(iamClient, ROLE_NAME, policyArn);
                timestreamDependencyHelper.deleteIAMPolicy(iamClient, policyArn);
                timestreamDependencyHelper.deleteIAMRole(iamClient, ROLE_NAME);
            }

            if (subscriptionArn != null) {
                timestreamDependencyHelper.unsubscribeFromSnsTopic(subscriptionArn, snsClient);
                timestreamDependencyHelper.deleteSQSQueue(sqsClient, QUEUE_NAME);
            }


            if (topicArn != null) {
                timestreamDependencyHelper.deleteSNSTopic(snsClient, topicArn);
            }

            if (s3ErrorReportBucketName != null) {
                timestreamDependencyHelper.deleteS3Bucket(s3Client, s3ErrorReportBucketName);
            }

            crudAndSimpleIngestionExample.deleteTable(SQ_TABLE_NAME, SQ_DATABASE_NAME);
            crudAndSimpleIngestionExample.deleteDatabase(SQ_DATABASE_NAME);
        }
    }