func main()

in sample_apps_reinvent2021/goV2/scheduled-query-sample.go [24:279]


func main() {

	databaseName := utils.DATABASE_NAME
	tableName := utils.TABLE_NAME
	sqSampleAppTopicName := "sq_sample_app_topic.fifo"
	sqSampleAppQueueName := "sq_sample_app_queue.fifo"
	sqResultsDatabaseName := utils.SQ_DATABASE_NAME
	sqResultsTableName := utils.SQ_TABLE_NAME
	region := flag.String("region", utils.REGION, "region")
	csvFilePath := flag.String("csv_file_path", utils.SAMPLE_DATA_CSV_FILE_PATH, "sample csv file path to"+
		" ingest records into the table used for scheduled query")
	deleteResourcesFlag := flag.Bool("delete_resources_after_execution_flag", true, "true/false to"+
		" delete created resources after execution of this code")
	runInvalidScheduledQueryFlag := flag.Bool("run_invalid_scheduled_query_flag", false, "false/true to"+
		" run valid/invalid scheduledQuery E2E flow")
	flag.Parse()
	ok, err := utils.FileExists(*csvFilePath)
	if !ok {
		if errors.Is(err, os.ErrNotExist) {
			utils.HandleError(err, fmt.Sprintf("csv file path provided=%s doesn't exist. Please "+
				"provide with a valid csv file path\n", *csvFilePath), true)
		}
		utils.HandleError(err, fmt.Sprintf("csv file path provided=%s is invalid. Please "+
			"provide with a valid csv file path\n", *csvFilePath), true)
	}

	tr := utils.LoadHttpSettings()
	// Use the SDK's default configuration.
	cfg, err := config.LoadDefaultConfig(context.TODO())
	cfg.Region = *region
	cfg.HTTPClient = &http.Client{Transport: tr}

	utils.HandleError(err, "Failed to load config ", true)

	writeSvc := timestreamwrite.NewFromConfig(cfg)
	querySvc := timestreamquery.NewFromConfig(cfg)

	snsSvc := sns.NewFromConfig(cfg)
	sqsSvc := sqs.NewFromConfig(cfg)
	s3Svc := s3.NewFromConfig(cfg)
	iamSvc := iam.NewFromConfig(cfg)
	stsSvc := sts.NewFromConfig(cfg)

	timestreamBuilder := utils.TimestreamBuilder{WriteSvc: writeSvc, QuerySvc: querySvc}
	timestreamDependencyHelper := utils.TimestreamDependencyHelper{
		SnsSvc: snsSvc, SqsSvc: sqsSvc, S3Svc: s3Svc, IamSvc: iamSvc, StsSvc: stsSvc}

	var topicArn, s3BucketName, queueUrl, queueArn, subscriptionArn, scheduledQueryArn string
	var createdResourcesList []utils.Resource

	// Make the bucket name unique by appending 5 random characters at the end
	s3BucketName = utils.ERROR_CONFIGURATION_S3_BUCKET_NAME_PREFIX + utils.GenerateRandomStringWithSize(5)

	err = timestreamDependencyHelper.CreateS3Bucket(s3BucketName, *region)
	utils.HandleError(err, fmt.Sprintf("Failed to create S3Bucket %s ", s3BucketName), true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "S3", Identifier: s3BucketName})
	//Create sns topic and sqs queue for scheduled query
	topicArn, err = timestreamDependencyHelper.CreateSnsTopic(sqSampleAppTopicName)
	utils.HandleError(err, fmt.Sprintf("Failed to create sns topic %s ", sqSampleAppTopicName), true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "SNS_TOPIC", Identifier: topicArn})

	queueUrl, err = timestreamDependencyHelper.CreateSqsQueue(sqSampleAppQueueName)
	utils.HandleError(err, fmt.Sprintf("Failed to create sqs queue %s ", sqSampleAppQueueName), true)

	//Need to wait atleast 1s after creating queue to use it
	time.Sleep(2 * time.Second)

	queueArn, err = timestreamDependencyHelper.GetSqsQueueArn(queueUrl)
	utils.HandleError(err, "Failed to get sqs queue Arn ", true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "SQS_QUEUE", Identifier: queueArn})

	subscriptionArn, _ = timestreamDependencyHelper.SubscribeToSnsTopic(topicArn, queueArn)
	utils.HandleError(err, "Failed to subscribe sqs queue to sns topic ", true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "SNS_SUBSCRIPTION", AdditionalDetails: fmt.Sprintf("TOPIC_ARN='%s' QUEUE_ARN='%s'", topicArn, queueArn), Identifier: subscriptionArn})

	err = timestreamDependencyHelper.SetSqsAccessPolicy(queueUrl, topicArn, queueArn)
	utils.HandleError(err, "Failed to set sqs policy ", true)

	roleArn, err := timestreamDependencyHelper.CreateIamRole(utils.ROLE_NAME)
	utils.HandleError(err, fmt.Sprintf("Failed to create iam role with name %s ", utils.ROLE_NAME), true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "IAM_ROLE", Identifier: roleArn})

	policyArn, _ := timestreamDependencyHelper.CreateIamPolicy(utils.POLICY_NAME)
	utils.HandleError(err, fmt.Sprintf("Failed to create iam policy with name %s ", utils.POLICY_NAME), true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "IAM_POLICY", Identifier: policyArn})

	timestreamDependencyHelper.AttachIamPolicy(utils.ROLE_NAME, policyArn)
	utils.HandleError(err, fmt.Sprintf("Failed to attach iam role with name %s to the policy with name %s ",
		utils.ROLE_NAME, policyArn), true)

	fmt.Println("Waiting for 15 seconds for newly created role to become active")
	time.Sleep(15 * time.Second)

	//Scheduled Query Activities

	// Create database.
	err = timestreamBuilder.CreateDatabase(databaseName)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_DATABASE", Identifier: databaseName})

	err = timestreamBuilder.CreateTable(databaseName, tableName, s3BucketName)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_TABLE", Identifier: tableName})

	//Create database and table to store scheduled query results
	err = timestreamBuilder.CreateDatabase(sqResultsDatabaseName)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_DATABASE", Identifier: sqResultsDatabaseName})

	err = timestreamBuilder.CreateTable(sqResultsDatabaseName, sqResultsTableName, s3BucketName)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_TABLE", Identifier: sqResultsTableName})

	timestreamBuilder.IngestRecordsFromCsv(*csvFilePath, databaseName, tableName)

	/* Switch between Valid and Invalid Query to test Happy-case and Failure scenarios */
	if *runInvalidScheduledQueryFlag == false {
		fmt.Println("Creating a valid Scheduled Query Flow")
		scheduledQueryArn, err = timestreamBuilder.CreateValidScheduledQuery(topicArn, roleArn, s3BucketName, sqResultsDatabaseName, sqResultsTableName, databaseName, tableName)
	} else {
		fmt.Println("Creating a in-valid Scheduled Query Flow")
		scheduledQueryArn, err = timestreamBuilder.CreateInvalidScheduledQuery(topicArn, roleArn, s3BucketName, sqResultsDatabaseName, sqResultsTableName)
	}
	utils.HandleError(err, "Failed to create scheduled query ", true)

	scheduledQueries, err := timestreamBuilder.ListScheduledQueries()
	if err == nil {
		fmt.Printf("Total scheduledQueries size : %d\n", len(scheduledQueries))
	}

	err = timestreamBuilder.DescribeScheduledQuery(scheduledQueryArn)
	utils.HandleError(err, fmt.Sprintf("Failed to describe scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)

	err = timestreamBuilder.ExecuteScheduledQuery(scheduledQueryArn, time.Now())
	utils.HandleError(err, fmt.Sprintf("Failed to execute scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)

	err = timestreamBuilder.DescribeScheduledQuery(scheduledQueryArn)
	utils.HandleError(err, fmt.Sprintf("Failed to describe scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)

	// Sleep for 65 seconds to let ScheduledQuery run
	fmt.Printf("Waiting for 65 seconds for automatic ScheduledQuery executions & notifications from queueUrl %s\n", queueUrl)
	time.Sleep(65 * time.Second)

	didQuerySucceedManually := false
	wasQueryTriggeredAsExpected := false
	queryFailed := false

	for i := 0; i < 10; i++ {
		response, err := timestreamDependencyHelper.ReceiveMessage(queueUrl)
		fmt.Printf("response %+v, err %s\n", response, err)

		if response.ParseFlag != true {
			continue
		}
		timestreamDependencyHelper.DeleteMessage(queueUrl, response.ReceiptHandle)

		switch response.MessageAttributes.NotificationType.Value {
		case utils.SCHEDULED_QUERY_CREATING:
			fallthrough
		case utils.SCHEDULED_QUERY_CREATED:
			break
		case utils.MANUAL_TRIGGER_SUCCESS:
			fmt.Println("Manual execution of Scheduled Query succeeded")
			didQuerySucceedManually = true
			break
		case utils.AUTO_TRIGGER_SUCCESS:
			fmt.Println("Scheduled Query was triggered as expected. Now triggering another run manually")
			wasQueryTriggeredAsExpected = true
			break
		case utils.AUTO_TRIGGER_FAILURE:
			fallthrough
		case utils.MANUAL_TRIGGER_FAILURE:
			queryFailed = true
			fmt.Printf("Failure Reason from SQS Notification:: %s\n", response.Message.ScheduledQueryRunSummary.FailureReason)
			fmt.Printf("ErrorReportLocation Details: %+v\n", response.Message.ScheduledQueryRunSummary.ErrorReportLocation)
			s3BucketName := response.Message.ScheduledQueryRunSummary.ErrorReportLocation.S3ReportLocation.BucketName
			objectKey := response.Message.ScheduledQueryRunSummary.ErrorReportLocation.S3ReportLocation.ObjectKey
			if s3BucketName != "" && objectKey != "" {
				timestreamDependencyHelper.ParseS3ErrorReport(s3BucketName, objectKey)
			}
		case utils.SCHEDULED_QUERY_DELETED:
			fallthrough
		case utils.SCHEDULED_QUERY_UPDATED:
			fallthrough
		default:
			fmt.Printf("response = %+v\n", response)
		}

		if (didQuerySucceedManually && wasQueryTriggeredAsExpected) || queryFailed {
			break
		}

	}

	outputFile := "output.log"
	f, err := os.Create(outputFile)
	utils.HandleError(err, fmt.Sprintf("Unable to create file %s", outputFile), true)
	defer f.Close()

	if wasQueryTriggeredAsExpected || didQuerySucceedManually {
		fmt.Println("Fetching Scheduled Query execution results")
		queryString := fmt.Sprintf("SELECT * FROM %s.%s", sqResultsDatabaseName, sqResultsTableName)
		utils.RunQuery(&queryString, timestreamBuilder.QuerySvc, f , int32(1000))
	}

	err = timestreamBuilder.UpdateScheduledQuery(scheduledQueryArn)
	utils.HandleError(err, fmt.Sprintf("Failed to update scheduled query with scheduledQueryArn %s ", scheduledQueryArn), false)

	if *deleteResourcesFlag {

		reader := bufio.NewReader(os.Stdin)
		fmt.Println("deleteResourcesFlag is set to true, Deleting all the resources that got created " +
			" as part of this sample, hit any key to continue")
		reader.ReadString('\n')
		fmt.Println("Cleaning the following resources")
		for _, resource := range createdResourcesList {
			fmt.Printf("\tResource Type : %s, Identifier (Arn/Name) : %s\n", resource.Type, resource.Identifier)
		}
		if scheduledQueryArn != "" {
			timestreamBuilder.DeleteScheduledQuery(scheduledQueryArn)
		}

		scheduledQueries, err = timestreamBuilder.ListScheduledQueries()
		if err == nil {
			fmt.Printf("scheduledQueries Size after deleting current ScheduledQuery: %d\n", len(scheduledQueries))
		}

		if s3BucketName != "" {
			timestreamDependencyHelper.DeleteS3Bucket(s3BucketName)
		}

		if policyArn != "" {
			timestreamDependencyHelper.DetachIamPolicy(utils.ROLE_NAME, policyArn)
			timestreamDependencyHelper.DeleteIamPolicy(policyArn)
			timestreamDependencyHelper.DeleteIamRole(utils.ROLE_NAME)
		}

		if subscriptionArn != "" {
			timestreamDependencyHelper.UnsubscribeToSnsTopic(subscriptionArn)
			queueUrl, err := timestreamDependencyHelper.GetSqsQueryUrl(sqSampleAppQueueName)
			if err == nil && queueUrl != "" {
				timestreamDependencyHelper.DeleteSqsQueue(queueUrl)
			}
		}

		if topicArn != "" {
			timestreamDependencyHelper.DeleteSnsTopic(topicArn)
		}

		timestreamBuilder.DeleteTable(sqResultsDatabaseName, sqResultsTableName)
		timestreamBuilder.DeleteDatabase(sqResultsDatabaseName)
		timestreamBuilder.DeleteTable(databaseName, tableName)
		timestreamBuilder.DeleteDatabase(databaseName)
	} else if len(createdResourcesList) > 0 {
		fmt.Println("Following Resources are created and not cleaned")
		for _, resource := range createdResourcesList {
			fmt.Printf("\tResource Type : %s, Identifier (Arn/Name) : %s\n", resource.Type, resource.Identifier)
		}
	}
}