in sample_apps/go/ingestion-csv-sample.go [27:197]
func main() {
/**
* Recommended Timestream write client SDK configuration:
* - Set SDK retry count to 10.
* - Use SDK DEFAULT_BACKOFF_STRATEGY
* - Request timeout of 20 seconds
*/
// Setting 20 seconds for timeout
tr := &http.Transport{
ResponseHeaderTimeout: 20 * time.Second,
// Using DefaultTransport values for other parameters: https://golang.org/pkg/net/http/#RoundTripper
Proxy: http.ProxyFromEnvironment,
DialContext: (&net.Dialer{
KeepAlive: 30 * time.Second,
DualStack: true,
Timeout: 30 * time.Second,
}).DialContext,
MaxIdleConns: 100,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}
// So client makes HTTP/2 requests
http2.ConfigureTransport(tr)
sess, err := session.NewSession(&aws.Config{Region: aws.String("us-east-1"), MaxRetries: aws.Int(10), HTTPClient: &http.Client{Transport: tr}})
writeSvc := timestreamwrite.New(sess)
// setup the query client
sessQuery, err := session.NewSession(&aws.Config{Region: aws.String("us-east-1")})
querySvc := timestreamquery.New(sessQuery)
databaseName := flag.String("database_name", "devops", "database name string")
tableName := flag.String("table_name", "host_metrics", "table name string")
testFileName := flag.String("test_file", "../data/sample.csv", "CSV file containing the data to ingest")
flag.Parse()
// Describe database.
describeDatabaseInput := ×treamwrite.DescribeDatabaseInput{
DatabaseName: aws.String(*databaseName),
}
describeDatabaseOutput, err := writeSvc.DescribeDatabase(describeDatabaseInput)
if err != nil {
fmt.Println("Error:")
fmt.Println(err)
// Create database if database doesn't exist.
serr, ok := err.(*timestreamwrite.ResourceNotFoundException)
fmt.Println(serr)
if ok {
fmt.Println("Creating database")
createDatabaseInput := ×treamwrite.CreateDatabaseInput{
DatabaseName: aws.String(*databaseName),
}
_, err = writeSvc.CreateDatabase(createDatabaseInput)
if err != nil {
panic(fmt.Sprintf("Error while creating database: %s", err))
}
}
} else {
fmt.Println("Database exists")
fmt.Println(describeDatabaseOutput)
}
// Describe table.
describeTableInput := ×treamwrite.DescribeTableInput{
DatabaseName: aws.String(*databaseName),
TableName: aws.String(*tableName),
}
describeTableOutput, err := writeSvc.DescribeTable(describeTableInput)
if err != nil {
fmt.Println("Error:")
fmt.Println(err)
serr, ok := err.(*timestreamwrite.ResourceNotFoundException)
fmt.Println(serr)
if ok {
// Create table if table doesn't exist.
fmt.Println("Creating the table")
createTableInput := ×treamwrite.CreateTableInput{
DatabaseName: aws.String(*databaseName),
TableName: aws.String(*tableName),
}
_, err = writeSvc.CreateTable(createTableInput)
if err != nil {
panic(fmt.Sprintf("Error while creating table: %s", err))
}
}
} else {
fmt.Println("Table exists")
fmt.Println(describeTableOutput)
}
csvfile, err := os.Open(*testFileName)
records := make([]*timestreamwrite.Record, 0)
if err != nil {
panic(fmt.Sprintf("Couldn't open the csv file %s", err))
}
// Get current time in nano seconds.
currentTimeInMilliSeconds := time.Now().UnixNano() / int64(time.Millisecond)
// Counter for number of records.
counter := int64(0)
reader := csv.NewReader(csvfile)
// Iterate through the records
for {
// Read each record from csv
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
fmt.Println(err)
}
records = append(records, ×treamwrite.Record{
Dimensions: []*timestreamwrite.Dimension{
×treamwrite.Dimension{
Name: aws.String(record[0]),
Value: aws.String(record[1]),
},
×treamwrite.Dimension{
Name: aws.String(record[2]),
Value: aws.String(record[3]),
},
×treamwrite.Dimension{
Name: aws.String(record[4]),
Value: aws.String(record[5]),
},
},
MeasureName: aws.String(record[6]),
MeasureValue: aws.String(record[7]),
MeasureValueType: aws.String(record[8]),
Time: aws.String(strconv.FormatInt(currentTimeInMilliSeconds-counter*int64(50), 10)),
TimeUnit: aws.String("MILLISECONDS"),
})
counter++
// WriteRecordsRequest has 100 records limit per request.
if counter%100 == 0 {
writeBatch(writeSvc, *databaseName, *tableName, records)
records = make([]*timestreamwrite.Record, 0)
fmt.Printf("Ingested %d records to the table.\n", counter)
}
}
if len(records) > 0 {
writeBatch(writeSvc, *databaseName, *tableName, records)
fmt.Printf("Ingested %d records to the table.\n", counter)
}
queryInput := ×treamquery.QueryInput{
QueryString: aws.String("select count(*) from " + *databaseName + "." + *tableName),
}
// execute the query
query, err := querySvc.Query(queryInput)
if err != nil {
fmt.Println("Error:")
fmt.Println(err)
} else {
fmt.Println(query)
}
}