public async Task RunScheduledQueryExample()

in sample_apps_reinvent2021/dotnet/ScheduledQueryExample.cs [110:324]


        public async Task RunScheduledQueryExample(bool skipDeletion, Dictionary<string, string> resourcesCreated)
        {
            try
            {
                resourcesCreated = await CreateResourcesForScheduledQuery(resourcesCreated);
                //Scheduled Query Activities

                /* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
                string scheduledQueryArn = await CreateValidScheduledQuery(
                    resourcesCreated[ScheduledQueryConstants.TopicArn],
                    resourcesCreated[ScheduledQueryConstants.RoleArn], ScheduledQueryConstants.SqDatabaseName,
                    ScheduledQueryConstants.SqTableName, resourcesCreated[AwsResourceConstant.BucketName]);
                resourcesCreated.Add(ScheduledQueryConstants.ScheduledQueryArn, scheduledQueryArn);

                // string scheduledQueryArn = await CreateInvalidScheduledQuery(
                //     resourcesCreated[ScheduledQueryConstants.TopicArn],
                //     resourcesCreated[ScheduledQueryConstants.RoleArn], ScheduledQueryConstants.SqDatabaseName,
                //     ScheduledQueryConstants.SqTableName, resourcesCreated[ScheduledQueryConstants.BucketName]);

                await ListScheduledQueries();
                await DescribeScheduledQuery(scheduledQueryArn);

                // Sleep for 65 seconds to let ScheduledQuery run
                Console.WriteLine("Waiting 65 seconds for automatic ScheduledQuery runs & notifications");
                Thread.Sleep(65000);

                await ProcessScheduledQueryNotifications(scheduledQueryArn);

                await UpdateScheduledQuery(scheduledQueryArn, ScheduledQueryState.DISABLED);
            }
            catch (Exception e)
            {
                Console.WriteLine($"Failed to run Scheduled Query example: {e}");
            }
            finally
            {
                if (skipDeletion)
                {
                    Console.WriteLine("The following resources created as part of sample application will have to be deleted manually");
                    foreach (KeyValuePair<string, string> kvp in resourcesCreated)
                    {
                        Console.WriteLine("{0}: {1}", kvp.Key, kvp.Value);
                    }
                }
                else
                {
                    await CleanupResources();
                }
            }

            async Task ProcessScheduledQueryNotifications(string scheduledQueryArn)
            {
                var didQuerySucceedManually = false;
                var wasQueryTriggeredAsExpected = false;
                var queryFailed = false;
                // Reading up to 10 messages in the SQS queue
                for (int i = 0; i < 10; i++)
                {
                    ReceiveMessageResponse response =
                        await _timestreamDependencyHelper.GetMessage(sqsClient,
                            resourcesCreated[ScheduledQueryConstants.QueueUrl]);
                    if (!response.Messages.Any())
                    {
                        continue;
                    }

                    Console.WriteLine($"\nMessage body of {response.Messages[0].MessageId}:");
                    string messageBody = response.Messages[0].Body;
                    Console.WriteLine($"{messageBody}");

                    if (messageBody == null)
                    {
                        continue;
                    }

                    JObject sqsMessage = JObject.Parse(messageBody);
                    JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings
                    {
                        NullValueHandling = NullValueHandling.Ignore
                    };
                    string notificationType =
                        sqsMessage["MessageAttributes"][AwsResourceConstant.NotificationType]["Value"].Value<string>();

                    switch (notificationType)
                    {
                        case "SCHEDULED_QUERY_CREATING":
                            // Scheduled Query is still pending to run.
                            Console.WriteLine("SCHEDULED_QUERY_CREATING notification is received");
                            break;
                        case "SCHEDULED_QUERY_CREATED":
                            // Scheduled Query is still pending to run.
                            Console.WriteLine("SCHEDULED_QUERY_CREATED notification is received");
                            break;
                        case "MANUAL_TRIGGER_SUCCESS":
                            Console.WriteLine("Manual run of Scheduled Query succeeded");
                            didQuerySucceedManually = true;
                            break;
                        case "AUTO_TRIGGER_SUCCESS":
                            Console.WriteLine(
                                "Scheduled Query was triggered as expected. Now triggering another run manually");
                            wasQueryTriggeredAsExpected = true;
                            ExecutionNotificationMessage successExecutionNotificationMessage =
                                JsonConvert.DeserializeObject<ExecutionNotificationMessage>(sqsMessage["Message"]
                                    .Value<string>(), jsonSerializerSettings);
                            DateTimeOffset dateTimeOffset =
                                DateTimeOffset.FromUnixTimeSeconds(successExecutionNotificationMessage
                                    .NextInvocationEpochSecond);
                            await ExecuteScheduledQuery(scheduledQueryArn, dateTimeOffset.DateTime);
                            break;
                        case "AUTO_TRIGGER_FAILURE":
                        // Fall-through
                        case "MANUAL_TRIGGER_FAILURE":
                            queryFailed = true;
                            ExecutionNotificationMessage failureExecutionNotificationMessage =
                                JsonConvert.DeserializeObject<ExecutionNotificationMessage>(
                                    sqsMessage["Message"].Value<string>(), jsonSerializerSettings);
                            string failureReason = failureExecutionNotificationMessage.ScheduledQueryRunSummary
                                .FailureReason;

                            Console.WriteLine($"Failure Reason from SQS Notification:: {failureReason}");

                            ErrorReportLocation errorReportLocation = failureExecutionNotificationMessage
                                .ScheduledQueryRunSummary.ErrorReportLocation;

                            if (errorReportLocation != null)
                            {
                                // Error Notification has Error report associated with it. We can parse it.
                                string s3ObjectKey = errorReportLocation.S3ReportLocation.ObjectKey;
                                await ParseS3ErrorReport(resourcesCreated[AwsResourceConstant.BucketName],
                                    s3ObjectKey);
                            }

                            break;
                        case "SCHEDULED_QUERY_DELETED":
                            // Fall-through
                            Console.WriteLine("SCHEDULED_QUERY_DELETED notification is received");
                            break;
                        case "SCHEDULED_QUERY_UPDATED":
                            // Fall-through
                            Console.WriteLine("SCHEDULED_QUERY_UPDATED notification is received");
                            break;
                        default:
                            Console.WriteLine("Received an unexpected message:: " + sqsMessage);
                            break;
                    }

                    // Since its a FIFO queue, we need to delete the message to be able to receive further messages
                    await _timestreamDependencyHelper.DeleteMessage(sqsClient, response.Messages[0],
                        resourcesCreated[ScheduledQueryConstants.QueueUrl]);

                    if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed)
                    {
                        break;
                    }
                }

                if (wasQueryTriggeredAsExpected || didQuerySucceedManually)
                {
                    Console.WriteLine("Fetching Scheduled Query run results");
                    await _queryRunner.RunQueryAsync("SELECT * FROM " + ScheduledQueryConstants.SqDatabaseName + "." +
                                                     ScheduledQueryConstants.SqTableName);
                }
            }

            async Task CleanupResources()
            {
                //Clean up for scheduled query
                if (resourcesCreated.ContainsKey(ScheduledQueryConstants.ScheduledQueryArn))
                {
                    await DeleteScheduledQuery(resourcesCreated[ScheduledQueryConstants.ScheduledQueryArn]);
                }

                if (resourcesCreated.ContainsKey(ScheduledQueryConstants.PolicyArn))
                {
                    await _timestreamDependencyHelper.DetachIamRolePolicy(iamClient,
                        ScheduledQueryConstants.IamRoleName,
                        resourcesCreated[ScheduledQueryConstants.PolicyArn]);
                    await _timestreamDependencyHelper.DeleteIamPolicy(iamClient,
                        resourcesCreated[ScheduledQueryConstants.PolicyArn]);
                }

                if (resourcesCreated.ContainsKey(ScheduledQueryConstants.RoleName))
                {
                    await _timestreamDependencyHelper.DeleteIamRole(iamClient, ScheduledQueryConstants.IamRoleName);
                }

                if (resourcesCreated.ContainsKey(ScheduledQueryConstants.SubscriptionArn))
                {
                    await _timestreamDependencyHelper.UnsubscribeFromSnsTopic(snsClient,
                        resourcesCreated[ScheduledQueryConstants.SubscriptionArn]);
                }

                if (resourcesCreated.ContainsKey(ScheduledQueryConstants.QueueUrl))
                {
                    await _timestreamDependencyHelper.DeleteQueue(sqsClient, ScheduledQueryConstants.FifoQueueName);
                }

                if (resourcesCreated.ContainsKey(ScheduledQueryConstants.TopicArn))
                {
                    await _timestreamDependencyHelper.DeleteSnsTopic(snsClient,
                        resourcesCreated[ScheduledQueryConstants.TopicArn]);
                }

                if (resourcesCreated.ContainsKey(AwsResourceConstant.BucketName))
                {
                    await _timestreamDependencyHelper.DeleteS3Bucket(s3Client,
                        resourcesCreated[AwsResourceConstant.BucketName]);
                }

                await _timestreamCrudHelper.DeleteTable(_amazonTimestreamWrite, ScheduledQueryConstants.SqDatabaseName,
                    ScheduledQueryConstants.SqTableName);
                await _timestreamCrudHelper.DeleteDatabase(_amazonTimestreamWrite,
                    ScheduledQueryConstants.SqDatabaseName);
            }
        }