func main()

in sample_apps_reinvent2021/goV2/ingestion-csv-sample.go [24:145]


func main() {

	databaseName := flag.String("database_name", utils.DATABASE_NAME, "database name string")
	tableName := flag.String("table_name", utils.TABLE_NAME, "table name string")
	testFileName := flag.String("test_file", utils.SAMPLE_DATA_CSV_FILE_PATH, "CSV file containing the data to ingest")
	region := flag.String("region", utils.REGION, "region")

	flag.Parse()

	// Use the SDK's default configuration.
	cfg, err := config.LoadDefaultConfig(context.TODO())
	tr := utils.LoadHttpSettings()

	writeSvc := timestreamwrite.NewFromConfig(cfg, func(o *timestreamwrite.Options) {
		o.Region = *region
		o.HTTPClient = &http.Client{Transport: tr}
	})
	querySvc := timestreamquery.NewFromConfig(cfg, func(o *timestreamquery.Options) {
		o.Region = *region
		o.HTTPClient = &http.Client{Transport: tr}
	})
	timestreamBuilder := utils.TimestreamBuilder{WriteSvc: writeSvc, QuerySvc: querySvc}

	s3Svc := s3.NewFromConfig(cfg, func(o *s3.Options) {
		o.Region = *region
	})
	timestreamDependencyHelper := utils.TimestreamDependencyHelper{S3Svc: s3Svc}

	createdResourcesList := []utils.Resource{}
	// Make the bucket name unique by appending 5 random characters at the end
	s3BucketName := utils.ERROR_CONFIGURATION_S3_BUCKET_NAME_PREFIX + utils.GenerateRandomStringWithSize(5)
	err = timestreamDependencyHelper.CreateS3Bucket(s3BucketName, *region)
	utils.HandleError(err, fmt.Sprintf("Failed to create S3Bucket %s ", s3BucketName), true)
	createdResourcesList = append(createdResourcesList, utils.Resource{Type: "S3", Identifier: s3BucketName})
	var RESOURCE_NOT_FOUND *types.ResourceNotFoundException
	// Describe database.
	err = timestreamBuilder.DescribeDatabase(*databaseName)
	if err != nil {
		if errors.As(err, &RESOURCE_NOT_FOUND) {
			err = timestreamBuilder.CreateDatabase(*databaseName)
			if err != nil {
				utils.CleanUp(timestreamBuilder, timestreamDependencyHelper, "", "", s3BucketName)
				utils.HandleError(err, fmt.Sprintf("Failed to create Database %s\n", *databaseName), true)
			}
			createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_DATABASE", Identifier: *databaseName})
		}
	}

	// Describe table.
	_, err = timestreamBuilder.DescribeTable(*databaseName, *tableName)
	if err != nil {
		if errors.As(err, &RESOURCE_NOT_FOUND) {
			err = timestreamBuilder.CreateTable(*databaseName, *tableName, s3BucketName)
			if err != nil {
				utils.CleanUp(timestreamBuilder, timestreamDependencyHelper, *databaseName, "", s3BucketName)
				utils.HandleError(err, fmt.Sprintf("Failed to create table %s in database %s ",
					*tableName, *databaseName), true)
			}
			createdResourcesList = append(createdResourcesList, utils.Resource{Type: "TIMESTREAM_TABLE", Identifier: *tableName})
		}
	} else {
		fmt.Println("Table already exists")
		fmt.Printf("Deleting created s3Bucket=%s as it was not used for creating the table.", s3BucketName)
		timestreamDependencyHelper.DeleteS3Bucket(s3BucketName)
		if len(createdResourcesList) > 0 && createdResourcesList[0].Identifier == s3BucketName {
			createdResourcesList = createdResourcesList[1:]
		}
	}

	total_records := int64(0)
	// sample query
	queryString := fmt.Sprintf("select count(*) as total_records from %s.%s", *databaseName, *tableName)
	// execute the query
	queryOutput, err := timestreamBuilder.QueryWithQueryString(queryString)
	if err != nil {
		fmt.Printf("Error while querying: %s", err.Error())
		utils.HandleError(err, fmt.Sprintf("Failed to query from table %s in database %s ",
			*tableName, *databaseName), true)
	} else {
		utils.ParseQueryResult(queryOutput, nil)
		if len(queryOutput.Rows) > 0 {
			total_records, _ = strconv.ParseInt(*queryOutput.Rows[0].Data[0].ScalarValue, 10, 64)
		}
	}

	if total_records >= 63000 {
		fmt.Println("Records are already ingested into database, Skipping Ingestion from csv provided")
	} else {
		//Ingest records from csv file
		err = timestreamBuilder.IngestRecordsFromCsv(*testFileName, *databaseName, *tableName)
		if err != nil {
			utils.CleanUp(timestreamBuilder, timestreamDependencyHelper, *databaseName, *tableName, s3BucketName)
			utils.HandleError(err, fmt.Sprintf("Failed to ingest data from csv path `%s` table %s in database %s ", *testFileName),
				true)
		} else {
			fmt.Println("Ingesting Records Complete")
		}

		// sample query
		queryString := fmt.Sprintf("select count(*) as total_records from %s.%s", *databaseName, *tableName)
		// execute the query
		queryOutput, err := timestreamBuilder.QueryWithQueryString(queryString)
		if err != nil {
			fmt.Printf("Error while querying: %s", err.Error())
		} else {
			utils.ParseQueryResult(queryOutput, nil)
			if len(queryOutput.Rows) > 0 {
				total_records, _ := strconv.ParseInt(*queryOutput.Rows[0].Data[0].ScalarValue, 10, 64)
				fmt.Println("Total ingested records count : ", total_records)
			}
		}
	}

	if len(createdResourcesList) > 0 {
		fmt.Println("Following Resources are created and not cleaned")
		for _, resource := range createdResourcesList {
			fmt.Printf("\tResource Type : %s, Identifier/Name : %s\n", resource.Type, resource.Identifier)
		}
	} else {
		fmt.Println("Used existing resources to ingest data")
	}
}