in sample_apps_reinvent2021/dotnet/ScheduledQueryExample.cs [110:324]
public async Task RunScheduledQueryExample(bool skipDeletion, Dictionary<string, string> resourcesCreated)
{
try
{
resourcesCreated = await CreateResourcesForScheduledQuery(resourcesCreated);
//Scheduled Query Activities
/* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
string scheduledQueryArn = await CreateValidScheduledQuery(
resourcesCreated[ScheduledQueryConstants.TopicArn],
resourcesCreated[ScheduledQueryConstants.RoleArn], ScheduledQueryConstants.SqDatabaseName,
ScheduledQueryConstants.SqTableName, resourcesCreated[AwsResourceConstant.BucketName]);
resourcesCreated.Add(ScheduledQueryConstants.ScheduledQueryArn, scheduledQueryArn);
// string scheduledQueryArn = await CreateInvalidScheduledQuery(
// resourcesCreated[ScheduledQueryConstants.TopicArn],
// resourcesCreated[ScheduledQueryConstants.RoleArn], ScheduledQueryConstants.SqDatabaseName,
// ScheduledQueryConstants.SqTableName, resourcesCreated[ScheduledQueryConstants.BucketName]);
await ListScheduledQueries();
await DescribeScheduledQuery(scheduledQueryArn);
// Sleep for 65 seconds to let ScheduledQuery run
Console.WriteLine("Waiting 65 seconds for automatic ScheduledQuery runs & notifications");
Thread.Sleep(65000);
await ProcessScheduledQueryNotifications(scheduledQueryArn);
await UpdateScheduledQuery(scheduledQueryArn, ScheduledQueryState.DISABLED);
}
catch (Exception e)
{
Console.WriteLine($"Failed to run Scheduled Query example: {e}");
}
finally
{
if (skipDeletion)
{
Console.WriteLine("The following resources created as part of sample application will have to be deleted manually");
foreach (KeyValuePair<string, string> kvp in resourcesCreated)
{
Console.WriteLine("{0}: {1}", kvp.Key, kvp.Value);
}
}
else
{
await CleanupResources();
}
}
async Task ProcessScheduledQueryNotifications(string scheduledQueryArn)
{
var didQuerySucceedManually = false;
var wasQueryTriggeredAsExpected = false;
var queryFailed = false;
// Reading up to 10 messages in the SQS queue
for (int i = 0; i < 10; i++)
{
ReceiveMessageResponse response =
await _timestreamDependencyHelper.GetMessage(sqsClient,
resourcesCreated[ScheduledQueryConstants.QueueUrl]);
if (!response.Messages.Any())
{
continue;
}
Console.WriteLine($"\nMessage body of {response.Messages[0].MessageId}:");
string messageBody = response.Messages[0].Body;
Console.WriteLine($"{messageBody}");
if (messageBody == null)
{
continue;
}
JObject sqsMessage = JObject.Parse(messageBody);
JsonSerializerSettings jsonSerializerSettings = new JsonSerializerSettings
{
NullValueHandling = NullValueHandling.Ignore
};
string notificationType =
sqsMessage["MessageAttributes"][AwsResourceConstant.NotificationType]["Value"].Value<string>();
switch (notificationType)
{
case "SCHEDULED_QUERY_CREATING":
// Scheduled Query is still pending to run.
Console.WriteLine("SCHEDULED_QUERY_CREATING notification is received");
break;
case "SCHEDULED_QUERY_CREATED":
// Scheduled Query is still pending to run.
Console.WriteLine("SCHEDULED_QUERY_CREATED notification is received");
break;
case "MANUAL_TRIGGER_SUCCESS":
Console.WriteLine("Manual run of Scheduled Query succeeded");
didQuerySucceedManually = true;
break;
case "AUTO_TRIGGER_SUCCESS":
Console.WriteLine(
"Scheduled Query was triggered as expected. Now triggering another run manually");
wasQueryTriggeredAsExpected = true;
ExecutionNotificationMessage successExecutionNotificationMessage =
JsonConvert.DeserializeObject<ExecutionNotificationMessage>(sqsMessage["Message"]
.Value<string>(), jsonSerializerSettings);
DateTimeOffset dateTimeOffset =
DateTimeOffset.FromUnixTimeSeconds(successExecutionNotificationMessage
.NextInvocationEpochSecond);
await ExecuteScheduledQuery(scheduledQueryArn, dateTimeOffset.DateTime);
break;
case "AUTO_TRIGGER_FAILURE":
// Fall-through
case "MANUAL_TRIGGER_FAILURE":
queryFailed = true;
ExecutionNotificationMessage failureExecutionNotificationMessage =
JsonConvert.DeserializeObject<ExecutionNotificationMessage>(
sqsMessage["Message"].Value<string>(), jsonSerializerSettings);
string failureReason = failureExecutionNotificationMessage.ScheduledQueryRunSummary
.FailureReason;
Console.WriteLine($"Failure Reason from SQS Notification:: {failureReason}");
ErrorReportLocation errorReportLocation = failureExecutionNotificationMessage
.ScheduledQueryRunSummary.ErrorReportLocation;
if (errorReportLocation != null)
{
// Error Notification has Error report associated with it. We can parse it.
string s3ObjectKey = errorReportLocation.S3ReportLocation.ObjectKey;
await ParseS3ErrorReport(resourcesCreated[AwsResourceConstant.BucketName],
s3ObjectKey);
}
break;
case "SCHEDULED_QUERY_DELETED":
// Fall-through
Console.WriteLine("SCHEDULED_QUERY_DELETED notification is received");
break;
case "SCHEDULED_QUERY_UPDATED":
// Fall-through
Console.WriteLine("SCHEDULED_QUERY_UPDATED notification is received");
break;
default:
Console.WriteLine("Received an unexpected message:: " + sqsMessage);
break;
}
// Since its a FIFO queue, we need to delete the message to be able to receive further messages
await _timestreamDependencyHelper.DeleteMessage(sqsClient, response.Messages[0],
resourcesCreated[ScheduledQueryConstants.QueueUrl]);
if ((didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed)
{
break;
}
}
if (wasQueryTriggeredAsExpected || didQuerySucceedManually)
{
Console.WriteLine("Fetching Scheduled Query run results");
await _queryRunner.RunQueryAsync("SELECT * FROM " + ScheduledQueryConstants.SqDatabaseName + "." +
ScheduledQueryConstants.SqTableName);
}
}
async Task CleanupResources()
{
//Clean up for scheduled query
if (resourcesCreated.ContainsKey(ScheduledQueryConstants.ScheduledQueryArn))
{
await DeleteScheduledQuery(resourcesCreated[ScheduledQueryConstants.ScheduledQueryArn]);
}
if (resourcesCreated.ContainsKey(ScheduledQueryConstants.PolicyArn))
{
await _timestreamDependencyHelper.DetachIamRolePolicy(iamClient,
ScheduledQueryConstants.IamRoleName,
resourcesCreated[ScheduledQueryConstants.PolicyArn]);
await _timestreamDependencyHelper.DeleteIamPolicy(iamClient,
resourcesCreated[ScheduledQueryConstants.PolicyArn]);
}
if (resourcesCreated.ContainsKey(ScheduledQueryConstants.RoleName))
{
await _timestreamDependencyHelper.DeleteIamRole(iamClient, ScheduledQueryConstants.IamRoleName);
}
if (resourcesCreated.ContainsKey(ScheduledQueryConstants.SubscriptionArn))
{
await _timestreamDependencyHelper.UnsubscribeFromSnsTopic(snsClient,
resourcesCreated[ScheduledQueryConstants.SubscriptionArn]);
}
if (resourcesCreated.ContainsKey(ScheduledQueryConstants.QueueUrl))
{
await _timestreamDependencyHelper.DeleteQueue(sqsClient, ScheduledQueryConstants.FifoQueueName);
}
if (resourcesCreated.ContainsKey(ScheduledQueryConstants.TopicArn))
{
await _timestreamDependencyHelper.DeleteSnsTopic(snsClient,
resourcesCreated[ScheduledQueryConstants.TopicArn]);
}
if (resourcesCreated.ContainsKey(AwsResourceConstant.BucketName))
{
await _timestreamDependencyHelper.DeleteS3Bucket(s3Client,
resourcesCreated[AwsResourceConstant.BucketName]);
}
await _timestreamCrudHelper.DeleteTable(_amazonTimestreamWrite, ScheduledQueryConstants.SqDatabaseName,
ScheduledQueryConstants.SqTableName);
await _timestreamCrudHelper.DeleteDatabase(_amazonTimestreamWrite,
ScheduledQueryConstants.SqDatabaseName);
}
}