in sample_apps_reinvent2021/goV2/scheduled-query-sample.go [24:279]
func main() {
databaseName := utils.DATABASE_NAME
tableName := utils.TABLE_NAME
sqSampleAppTopicName := "sq_sample_app_topic.fifo"
sqSampleAppQueueName := "sq_sample_app_queue.fifo"
sqResultsDatabaseName := utils.SQ_DATABASE_NAME
sqResultsTableName := utils.SQ_TABLE_NAME
region := flag.String("region", utils.REGION, "region")
csvFilePath := flag.String("csv_file_path", utils.SAMPLE_DATA_CSV_FILE_PATH, "sample csv file path to"+
" ingest records into the table used for scheduled query")
deleteResourcesFlag := flag.Bool("delete_resources_after_execution_flag", true, "true/false to"+
" delete created resources after execution of this code")
runInvalidScheduledQueryFlag := flag.Bool("run_invalid_scheduled_query_flag", false, "false/true to"+
" run valid/invalid scheduledQuery E2E flow")
flag.Parse()
ok, err := utils.FileExists(*csvFilePath)
if !ok {
if errors.Is(err, os.ErrNotExist) {
utils.HandleError(err, fmt.Sprintf("csv file path provided=%s doesn't exist. Please "+
"provide with a valid csv file path\n", *csvFilePath), true)
}
utils.HandleError(err, fmt.Sprintf("csv file path provided=%s is invalid. Please "+
"provide with a valid csv file path\n", *csvFilePath), true)
}
tr := utils.LoadHttpSettings()
// Use the SDK's default configuration.
cfg, err := config.LoadDefaultConfig(context.TODO())
cfg.Region = *region
cfg.HTTPClient = &http.Client{Transport: tr}
utils.HandleError(err, "Failed to load config ", true)
writeSvc := timestreamwrite.NewFromConfig(cfg)
querySvc := timestreamquery.NewFromConfig(cfg)
snsSvc := sns.NewFromConfig(cfg)
sqsSvc := sqs.NewFromConfig(cfg)
s3Svc := s3.NewFromConfig(cfg)
iamSvc := iam.NewFromConfig(cfg)
stsSvc := sts.NewFromConfig(cfg)
timestreamBuilder := utils.TimestreamBuilder{WriteSvc: writeSvc, QuerySvc: querySvc}
timestreamDependencyHelper := utils.TimestreamDependencyHelper{
SnsSvc: snsSvc, SqsSvc: sqsSvc, S3Svc: s3Svc, IamSvc: iamSvc, StsSvc: stsSvc}
var topicArn, s3BucketName, queueUrl, queueArn, subscriptionArn, scheduledQueryArn string
var createdResourcesList []utils.Resource
// Make the bucket name unique by appending 5 random characters at the end
s3BucketName = utils.ERROR_CONFIGURATION_S3_BUCKET_NAME_PREFIX + utils.GenerateRandomStringWithSize(5)
err = timestreamDependencyHelper.CreateS3Bucket(s3BucketName, *region)
utils.HandleError(err, fmt.Sprintf("Failed to create S3Bucket %s ", s3BucketName), true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "S3", Identifier: s3BucketName})
//Create sns topic and sqs queue for scheduled query
topicArn, err = timestreamDependencyHelper.CreateSnsTopic(sqSampleAppTopicName)
utils.HandleError(err, fmt.Sprintf("Failed to create sns topic %s ", sqSampleAppTopicName), true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "SNS_TOPIC", Identifier: topicArn})
queueUrl, err = timestreamDependencyHelper.CreateSqsQueue(sqSampleAppQueueName)
utils.HandleError(err, fmt.Sprintf("Failed to create sqs queue %s ", sqSampleAppQueueName), true)
//Need to wait atleast 1s after creating queue to use it
time.Sleep(2 * time.Second)
queueArn, err = timestreamDependencyHelper.GetSqsQueueArn(queueUrl)
utils.HandleError(err, "Failed to get sqs queue Arn ", true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "SQS_QUEUE", Identifier: queueArn})
subscriptionArn, _ = timestreamDependencyHelper.SubscribeToSnsTopic(topicArn, queueArn)
utils.HandleError(err, "Failed to subscribe sqs queue to sns topic ", true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "SNS_SUBSCRIPTION", AdditionalDetails: fmt.Sprintf("TOPIC_ARN='%s' QUEUE_ARN='%s'", topicArn, queueArn), Identifier: subscriptionArn})
err = timestreamDependencyHelper.SetSqsAccessPolicy(queueUrl, topicArn, queueArn)
utils.HandleError(err, "Failed to set sqs policy ", true)
roleArn, err := timestreamDependencyHelper.CreateIamRole(utils.ROLE_NAME)
utils.HandleError(err, fmt.Sprintf("Failed to create iam role with name %s ", utils.ROLE_NAME), true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "IAM_ROLE", Identifier: roleArn})
policyArn, _ := timestreamDependencyHelper.CreateIamPolicy(utils.POLICY_NAME)
utils.HandleError(err, fmt.Sprintf("Failed to create iam policy with name %s ", utils.POLICY_NAME), true)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "IAM_POLICY", Identifier: policyArn})
timestreamDependencyHelper.AttachIamPolicy(utils.ROLE_NAME, policyArn)
utils.HandleError(err, fmt.Sprintf("Failed to attach iam role with name %s to the policy with name %s ",
utils.ROLE_NAME, policyArn), true)
fmt.Println("Waiting for 15 seconds for newly created role to become active")
time.Sleep(15 * time.Second)
//Scheduled Query Activities
// Create database.
err = timestreamBuilder.CreateDatabase(databaseName)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_DATABASE", Identifier: databaseName})
err = timestreamBuilder.CreateTable(databaseName, tableName, s3BucketName)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_TABLE", Identifier: tableName})
//Create database and table to store scheduled query results
err = timestreamBuilder.CreateDatabase(sqResultsDatabaseName)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_DATABASE", Identifier: sqResultsDatabaseName})
err = timestreamBuilder.CreateTable(sqResultsDatabaseName, sqResultsTableName, s3BucketName)
createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_TABLE", Identifier: sqResultsTableName})
timestreamBuilder.IngestRecordsFromCsv(*csvFilePath, databaseName, tableName)
/* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
if *runInvalidScheduledQueryFlag == false {
fmt.Println("Creating a valid Scheduled Query Flow")
scheduledQueryArn, err = timestreamBuilder.CreateValidScheduledQuery(topicArn, roleArn, s3BucketName, sqResultsDatabaseName, sqResultsTableName, databaseName, tableName)
} else {
fmt.Println("Creating a in-valid Scheduled Query Flow")
scheduledQueryArn, err = timestreamBuilder.CreateInvalidScheduledQuery(topicArn, roleArn, s3BucketName, sqResultsDatabaseName, sqResultsTableName)
}
utils.HandleError(err, "Failed to create scheduled query ", true)
scheduledQueries, err := timestreamBuilder.ListScheduledQueries()
if err == nil {
fmt.Printf("Total scheduledQueries size : %d\n", len(scheduledQueries))
}
err = timestreamBuilder.DescribeScheduledQuery(scheduledQueryArn)
utils.HandleError(err, fmt.Sprintf("Failed to describe scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)
err = timestreamBuilder.ExecuteScheduledQuery(scheduledQueryArn, time.Now())
utils.HandleError(err, fmt.Sprintf("Failed to execute scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)
err = timestreamBuilder.DescribeScheduledQuery(scheduledQueryArn)
utils.HandleError(err, fmt.Sprintf("Failed to describe scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)
// Sleep for 65 seconds to let ScheduledQuery run
fmt.Printf("Waiting for 65 seconds for automatic ScheduledQuery executions & notifications from queueUrl %s\n", queueUrl)
time.Sleep(65 * time.Second)
didQuerySucceedManually := false
wasQueryTriggeredAsExpected := false
queryFailed := false
for i := 0; i < 10; i++ {
response, err := timestreamDependencyHelper.ReceiveMessage(queueUrl)
fmt.Printf("response %+v, err %s\n", response, err)
if response.ParseFlag != true {
continue
}
timestreamDependencyHelper.DeleteMessage(queueUrl, response.ReceiptHandle)
switch response.MessageAttributes.NotificationType.Value {
case utils.SCHEDULED_QUERY_CREATING:
fallthrough
case utils.SCHEDULED_QUERY_CREATED:
break
case utils.MANUAL_TRIGGER_SUCCESS:
fmt.Println("Manual execution of Scheduled Query succeeded")
didQuerySucceedManually = true
break
case utils.AUTO_TRIGGER_SUCCESS:
fmt.Println("Scheduled Query was triggered as expected. Now triggering another run manually")
wasQueryTriggeredAsExpected = true
break
case utils.AUTO_TRIGGER_FAILURE:
fallthrough
case utils.MANUAL_TRIGGER_FAILURE:
queryFailed = true
fmt.Printf("Failure Reason from SQS Notification:: %s\n", response.Message.ScheduledQueryRunSummary.FailureReason)
fmt.Printf("ErrorReportLocation Details: %+v\n", response.Message.ScheduledQueryRunSummary.ErrorReportLocation)
s3BucketName := response.Message.ScheduledQueryRunSummary.ErrorReportLocation.S3ReportLocation.BucketName
objectKey := response.Message.ScheduledQueryRunSummary.ErrorReportLocation.S3ReportLocation.ObjectKey
if s3BucketName != "" && objectKey != "" {
timestreamDependencyHelper.ParseS3ErrorReport(s3BucketName, objectKey)
}
case utils.SCHEDULED_QUERY_DELETED:
fallthrough
case utils.SCHEDULED_QUERY_UPDATED:
fallthrough
default:
fmt.Printf("response = %+v\n", response)
}
if (didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed {
break
}
}
outputFile := "output.log"
f, err := os.Create(outputFile)
utils.HandleError(err, fmt.Sprintf("Unable to create file %s", outputFile), true)
defer f.Close()
if wasQueryTriggeredAsExpected || didQuerySucceedManually {
fmt.Println("Fetching Scheduled Query execution results")
queryString := fmt.Sprintf("SELECT * FROM %s.%s", sqResultsDatabaseName, sqResultsTableName)
utils.RunQuery(&queryString, timestreamBuilder.QuerySvc, f , int32(1000))
}
err = timestreamBuilder.UpdateScheduledQuery(scheduledQueryArn)
utils.HandleError(err, fmt.Sprintf("Failed to update scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)
if *deleteResourcesFlag {
reader := bufio.NewReader(os.Stdin)
fmt.Println("deleteResourcesFlag is set to true, Deleting all the resources that got created " +
" as part of this sample, hit any key to continue")
reader.ReadString('\n')
fmt.Println("Cleaning the following resources")
for _, resource := range createdResourcesList {
fmt.Printf("\tResource Type : %s, Identifier (Arn/Name) : %s\n", resource.Type, resource.Identifier)
}
if scheduledQueryArn != "" {
timestreamBuilder.DeleteScheduledQuery(scheduledQueryArn)
}
scheduledQueries, err = timestreamBuilder.ListScheduledQueries()
if err == nil {
fmt.Printf("scheduledQueries Size after deleting current ScheduledQuery: %d\n", len(scheduledQueries))
}
if s3BucketName != "" {
timestreamDependencyHelper.DeleteS3Bucket(s3BucketName)
}
if policyArn != "" {
timestreamDependencyHelper.DetachIamPolicy(utils.ROLE_NAME, policyArn)
timestreamDependencyHelper.DeleteIamPolicy(policyArn)
timestreamDependencyHelper.DeleteIamRole(utils.ROLE_NAME)
}
if subscriptionArn != "" {
timestreamDependencyHelper.UnsubscribeToSnsTopic(subscriptionArn)
queueUrl, err := timestreamDependencyHelper.GetSqsQueryUrl(sqSampleAppQueueName)
if err == nil && queueUrl != "" {
timestreamDependencyHelper.DeleteSqsQueue(queueUrl)
}
}
if topicArn != "" {
timestreamDependencyHelper.DeleteSnsTopic(topicArn)
}
timestreamBuilder.DeleteTable(sqResultsDatabaseName, sqResultsTableName)
timestreamBuilder.DeleteDatabase(sqResultsDatabaseName)
timestreamBuilder.DeleteTable(databaseName, tableName)
timestreamBuilder.DeleteDatabase(databaseName)
} else if len(createdResourcesList) > 0 {
fmt.Println("Following Resources are created and not cleaned")
for _, resource := range createdResourcesList {
fmt.Printf("\tResource Type : %s, Identifier (Arn/Name) : %s\n", resource.Type, resource.Identifier)
}
}
}