plugins/outputs/timestream/timestream.go (523 lines of code) (raw):
package timestream
import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
"sync"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite"
"github.com/aws/aws-sdk-go-v2/service/timestreamwrite/types"
"github.com/aws/smithy-go"
"github.com/influxdata/telegraf"
internalaws "github.com/influxdata/telegraf/config/aws"
"github.com/influxdata/telegraf/plugins/outputs"
)
type (
Timestream struct {
MappingMode string `toml:"mapping_mode"`
DescribeDatabaseOnStart bool `toml:"describe_database_on_start"`
DatabaseName string `toml:"database_name"`
SingleTableName string `toml:"single_table_name"`
SingleTableDimensionNameForTelegrafMeasurementName string `toml:"single_table_dimension_name_for_telegraf_measurement_name"`
CreateTableIfNotExists bool `toml:"create_table_if_not_exists"`
CreateTableMagneticStoreRetentionPeriodInDays int64 `toml:"create_table_magnetic_store_retention_period_in_days"`
CreateTableMemoryStoreRetentionPeriodInHours int64 `toml:"create_table_memory_store_retention_period_in_hours"`
CreateTableTags map[string]string `toml:"create_table_tags"`
MaxWriteGoRoutinesCount int `toml:"max_write_go_routines"`
Log telegraf.Logger
svc WriteClient
internalaws.CredentialConfig
}
WriteClient interface {
CreateTable(context.Context, *timestreamwrite.CreateTableInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.CreateTableOutput, error)
WriteRecords(context.Context, *timestreamwrite.WriteRecordsInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.WriteRecordsOutput, error)
DescribeDatabase(context.Context, *timestreamwrite.DescribeDatabaseInput, ...func(*timestreamwrite.Options)) (*timestreamwrite.DescribeDatabaseOutput, error)
}
)
// Mapping modes specify how Telegraf model should be represented in Timestream model.
// See sample config for more details.
const (
MappingModeSingleTable = "single-table"
MappingModeMultiTable = "multi-table"
)
// MaxRecordsPerCall reflects Timestream limit of WriteRecords API call
const MaxRecordsPerCall = 100
// Default value for maximum number of parallel go routines to ingest/write data
// when max_write_go_routines is not specified in the config
const MaxWriteRoutinesDefault = 1
var sampleConfig = `
## Amazon Region
region = "us-east-1"
## Amazon Credentials
## Credentials are loaded in the following order:
## 1) Web identity provider credentials via STS if role_arn and web_identity_token_file are specified
## 2) Assumed credentials via STS if role_arn is specified
## 3) explicit credentials from 'access_key' and 'secret_key'
## 4) shared profile from 'profile'
## 5) environment variables
## 6) shared credentials file
## 7) EC2 Instance Profile
#access_key = ""
#secret_key = ""
#token = ""
#role_arn = ""
#web_identity_token_file = ""
#role_session_name = ""
#profile = ""
#shared_credential_file = ""
## Endpoint to make request against, the correct endpoint is automatically
## determined and this option should only be set if you wish to override the
## default.
## ex: endpoint_url = "http://localhost:8000"
# endpoint_url = ""
## Timestream database where the metrics will be inserted.
## The database must exist prior to starting Telegraf.
database_name = "yourDatabaseNameHere"
## Specifies if the plugin should describe the Timestream database upon starting
## to validate if it has access necessary permissions, connection, etc., as a safety check.
## If the describe operation fails, the plugin will not start
## and therefore the Telegraf agent will not start.
describe_database_on_start = false
## The mapping mode specifies how Telegraf records are represented in Timestream.
## Valid values are: single-table, multi-table.
## For example, consider the following data in line protocol format:
## weather,location=us-midwest,season=summer temperature=82,humidity=71 1465839830100400200
## airquality,location=us-west no2=5,pm25=16 1465839830100400200
## where weather and airquality are the measurement names, location and season are tags,
## and temperature, humidity, no2, pm25 are fields.
## In multi-table mode:
## - first line will be ingested to table named weather
## - second line will be ingested to table named airquality
## - the tags will be represented as dimensions
## - first table (weather) will have two records:
## one with measurement name equals to temperature,
## another with measurement name equals to humidity
## - second table (airquality) will have two records:
## one with measurement name equals to no2,
## another with measurement name equals to pm25
## - the Timestream tables from the example will look like this:
## TABLE "weather":
## time | location | season | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-midwest | summer | temperature | 82
## 2016-06-13 17:43:50 | us-midwest | summer | humidity | 71
## TABLE "airquality":
## time | location | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-west | no2 | 5
## 2016-06-13 17:43:50 | us-west | pm25 | 16
## In single-table mode:
## - the data will be ingested to a single table, which name will be valueOf(single_table_name)
## - measurement name will stored in dimension named valueOf(single_table_dimension_name_for_telegraf_measurement_name)
## - location and season will be represented as dimensions
## - temperature, humidity, no2, pm25 will be represented as measurement name
## - the Timestream table from the example will look like this:
## Assuming:
## - single_table_name = "my_readings"
## - single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## TABLE "my_readings":
## time | location | season | namespace | measure_name | measure_value::bigint
## 2016-06-13 17:43:50 | us-midwest | summer | weather | temperature | 82
## 2016-06-13 17:43:50 | us-midwest | summer | weather | humidity | 71
## 2016-06-13 17:43:50 | us-west | NULL | airquality | no2 | 5
## 2016-06-13 17:43:50 | us-west | NULL | airquality | pm25 | 16
## In most cases, using multi-table mapping mode is recommended.
## However, you can consider using single-table in situations when you have thousands of measurement names.
mapping_mode = "multi-table"
## Only valid and required for mapping_mode = "single-table"
## Specifies the Timestream table where the metrics will be uploaded.
# single_table_name = "yourTableNameHere"
## Only valid and required for mapping_mode = "single-table"
## Describes what will be the Timestream dimension name for the Telegraf
## measurement name.
# single_table_dimension_name_for_telegraf_measurement_name = "namespace"
## Specifies if the plugin should create the table, if the table do not exist.
## The plugin writes the data without prior checking if the table exists.
## When the table does not exist, the error returned from Timestream will cause
## the plugin to create the table, if this parameter is set to true.
create_table_if_not_exists = true
## Only valid and required if create_table_if_not_exists = true
## Specifies the Timestream table magnetic store retention period in days.
## Check Timestream documentation for more details.
create_table_magnetic_store_retention_period_in_days = 365
## Only valid and required if create_table_if_not_exists = true
## Specifies the Timestream table memory store retention period in hours.
## Check Timestream documentation for more details.
create_table_memory_store_retention_period_in_hours = 24
## Only valid and optional if create_table_if_not_exists = true
## Specifies the Timestream table tags.
## Check Timestream documentation for more details
# create_table_tags = { "foo" = "bar", "environment" = "dev"}
## Specify the maximum number of parallel go routines to ingest/write data
## If not specified, defaulted to 1 go routines
max_write_go_routines = 25
`
// WriteFactory function provides a way to mock the client instantiation for testing purposes.
var WriteFactory = func(credentialConfig *internalaws.CredentialConfig) (WriteClient, error) {
cfg, err := credentialConfig.Credentials()
if err != nil {
return ×treamwrite.Client{}, err
}
return timestreamwrite.NewFromConfig(cfg), nil
}
func (t *Timestream) Connect() error {
if t.DatabaseName == "" {
return fmt.Errorf("DatabaseName key is required")
}
if t.MappingMode == "" {
return fmt.Errorf("MappingMode key is required")
}
if t.MappingMode != MappingModeSingleTable && t.MappingMode != MappingModeMultiTable {
return fmt.Errorf("correct MappingMode key values are: '%s', '%s'",
MappingModeSingleTable, MappingModeMultiTable)
}
if t.MappingMode == MappingModeSingleTable {
if t.SingleTableName == "" {
return fmt.Errorf("in '%s' mapping mode, SingleTableName key is required", MappingModeSingleTable)
}
if t.SingleTableDimensionNameForTelegrafMeasurementName == "" {
return fmt.Errorf("in '%s' mapping mode, SingleTableDimensionNameForTelegrafMeasurementName key is required",
MappingModeSingleTable)
}
}
if t.MappingMode == MappingModeMultiTable {
if t.SingleTableName != "" {
return fmt.Errorf("in '%s' mapping mode, do not specify SingleTableName key", MappingModeMultiTable)
}
if t.SingleTableDimensionNameForTelegrafMeasurementName != "" {
return fmt.Errorf("in '%s' mapping mode, do not specify SingleTableDimensionNameForTelegrafMeasurementName key", MappingModeMultiTable)
}
}
if t.CreateTableIfNotExists {
if t.CreateTableMagneticStoreRetentionPeriodInDays < 1 {
return fmt.Errorf("if Telegraf should create tables, CreateTableMagneticStoreRetentionPeriodInDays key should have a value greater than 0")
}
if t.CreateTableMemoryStoreRetentionPeriodInHours < 1 {
return fmt.Errorf("if Telegraf should create tables, CreateTableMemoryStoreRetentionPeriodInHours key should have a value greater than 0")
}
}
if t.MaxWriteGoRoutinesCount <= 0 {
t.MaxWriteGoRoutinesCount = MaxWriteRoutinesDefault
}
t.Log.Infof("Constructing Timestream client for '%s' mode", t.MappingMode)
svc, err := WriteFactory(&t.CredentialConfig)
if err != nil {
return err
}
if t.DescribeDatabaseOnStart {
t.Log.Infof("Describing database '%s' in region '%s'", t.DatabaseName, t.Region)
describeDatabaseInput := ×treamwrite.DescribeDatabaseInput{
DatabaseName: aws.String(t.DatabaseName),
}
describeDatabaseOutput, err := svc.DescribeDatabase(context.Background(), describeDatabaseInput)
if err != nil {
t.Log.Errorf("Couldn't describe database '%s'. Check error, fix permissions, connectivity, create database.", t.DatabaseName)
return err
}
t.Log.Infof("Describe database '%s' returned: '%s'.", t.DatabaseName, describeDatabaseOutput)
}
t.svc = svc
return nil
}
func (t *Timestream) Close() error {
return nil
}
func (t *Timestream) SampleConfig() string {
return sampleConfig
}
func (t *Timestream) Description() string {
return "Configuration for Amazon Timestream output."
}
func init() {
outputs.Add("timestream", func() telegraf.Output {
return &Timestream{}
})
}
func (t *Timestream) Write(metrics []telegraf.Metric) error {
writeRecordsInputs := t.TransformMetrics(metrics)
maxWriteJobs := t.MaxWriteGoRoutinesCount
numberOfWriteRecordsInputs := len(writeRecordsInputs)
if numberOfWriteRecordsInputs < maxWriteJobs {
maxWriteJobs = numberOfWriteRecordsInputs
}
var wg sync.WaitGroup
errs := make(chan error, numberOfWriteRecordsInputs)
writeJobs := make(chan *timestreamwrite.WriteRecordsInput, maxWriteJobs)
start := time.Now()
for i := 0; i < maxWriteJobs; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for writeJob := range writeJobs {
if err := t.writeToTimestream(writeJob, true); err != nil {
errs <- err
}
}
}()
}
for i := range writeRecordsInputs {
writeJobs <- writeRecordsInputs[i]
}
// Close channel once all jobs are added
close(writeJobs)
wg.Wait()
elapsed := time.Since(start)
close(errs)
t.Log.Infof("##WriteToTimestream - Metrics size: %d request size: %d time(ms): %d",
len(metrics), len(writeRecordsInputs), elapsed.Milliseconds())
// On partial failures, Telegraf will reject the entire batch of metrics and
// retry. writeToTimestream will return retryable exceptions only.
for err := range errs {
if err != nil {
return err
}
}
return nil
}
func (t *Timestream) writeToTimestream(writeRecordsInput *timestreamwrite.WriteRecordsInput, resourceNotFoundRetry bool) error {
t.Log.Debugf("Writing to Timestream: '%v' with ResourceNotFoundRetry: '%t'", writeRecordsInput, resourceNotFoundRetry)
_, err := t.svc.WriteRecords(context.Background(), writeRecordsInput)
if err != nil {
// Telegraf will retry ingesting the metrics if an error is returned from the plugin.
// Therefore, return error only for retryable exceptions: ThrottlingException and 5xx exceptions.
var notFound *types.ResourceNotFoundException
if errors.As(err, ¬Found) {
if resourceNotFoundRetry {
t.Log.Warnf("Failed to write to Timestream database '%s' table '%s'. Error: '%s'",
t.DatabaseName, *writeRecordsInput.TableName, notFound)
return t.createTableAndRetry(writeRecordsInput)
}
t.logWriteToTimestreamError(notFound, writeRecordsInput.TableName)
}
var rejected *types.RejectedRecordsException
if errors.As(err, &rejected) {
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
return nil
}
var throttling *types.ThrottlingException
if errors.As(err, &throttling) {
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
t.DatabaseName, *writeRecordsInput.TableName, throttling)
}
var internal *types.InternalServerException
if errors.As(err, &internal) {
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
t.DatabaseName, *writeRecordsInput.TableName, internal)
}
var operation *smithy.OperationError
if !errors.As(err, &operation) {
// Retry other, non-aws errors.
return fmt.Errorf("unable to write to Timestream database '%s' table '%s'. Error: %s",
t.DatabaseName, *writeRecordsInput.TableName, err)
}
t.logWriteToTimestreamError(err, writeRecordsInput.TableName)
}
return nil
}
func (t *Timestream) logWriteToTimestreamError(err error, tableName *string) {
t.Log.Errorf("Failed to write to Timestream database '%s' table '%s'. Skipping metric! Error: '%s'",
t.DatabaseName, *tableName, err)
}
func (t *Timestream) createTableAndRetry(writeRecordsInput *timestreamwrite.WriteRecordsInput) error {
if t.CreateTableIfNotExists {
t.Log.Infof("Trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'true'.", *writeRecordsInput.TableName, t.DatabaseName)
err := t.createTable(writeRecordsInput.TableName)
if err == nil {
t.Log.Infof("Table '%s' in database '%s' created. Retrying writing.", *writeRecordsInput.TableName, t.DatabaseName)
return t.writeToTimestream(writeRecordsInput, false)
}
t.Log.Errorf("Failed to create table '%s' in database '%s': %s. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName, err)
} else {
t.Log.Errorf("Not trying to create table '%s' in database '%s', as 'CreateTableIfNotExists' config key is 'false'. Skipping metric!", *writeRecordsInput.TableName, t.DatabaseName)
}
return nil
}
// createTable creates a Timestream table according to the configuration.
func (t *Timestream) createTable(tableName *string) error {
createTableInput := ×treamwrite.CreateTableInput{
DatabaseName: aws.String(t.DatabaseName),
TableName: aws.String(*tableName),
RetentionProperties: &types.RetentionProperties{
MagneticStoreRetentionPeriodInDays: t.CreateTableMagneticStoreRetentionPeriodInDays,
MemoryStoreRetentionPeriodInHours: t.CreateTableMemoryStoreRetentionPeriodInHours,
},
}
var tags []types.Tag
for key, val := range t.CreateTableTags {
tags = append(tags, types.Tag{
Key: aws.String(key),
Value: aws.String(val),
})
}
createTableInput.Tags = tags
_, err := t.svc.CreateTable(context.Background(), createTableInput)
if err != nil {
if _, ok := err.(*types.ConflictException); ok {
// if the table was created in the meantime, it's ok.
return nil
}
return err
}
return nil
}
// TransformMetrics transforms a collection of Telegraf Metrics into write requests to Timestream.
// Telegraf Metrics are grouped by Name, Tag Keys and Time to use Timestream CommonAttributes.
// Returns collection of write requests to be performed to Timestream.
func (t *Timestream) TransformMetrics(metrics []telegraf.Metric) []*timestreamwrite.WriteRecordsInput {
writeRequests := make(map[string]*timestreamwrite.WriteRecordsInput, len(metrics))
for _, m := range metrics {
// build MeasureName, MeasureValue, MeasureValueType
records := t.buildWriteRecords(m)
if len(records) == 0 {
continue
}
var tableName string
if t.MappingMode == MappingModeSingleTable {
tableName = t.SingleTableName
}
if t.MappingMode == MappingModeMultiTable {
tableName = m.Name()
}
if curr, ok := writeRequests[tableName]; !ok {
newWriteRecord := ×treamwrite.WriteRecordsInput{
DatabaseName: aws.String(t.DatabaseName),
TableName: aws.String(tableName),
Records: records,
CommonAttributes: &types.Record{},
}
writeRequests[tableName] = newWriteRecord
} else {
curr.Records = append(curr.Records, records...)
}
}
// Create result as array of WriteRecordsInput. Split requests over records count limit to smaller requests.
var result []*timestreamwrite.WriteRecordsInput
for _, writeRequest := range writeRequests {
if len(writeRequest.Records) > MaxRecordsPerCall {
for _, recordsPartition := range partitionRecords(MaxRecordsPerCall, writeRequest.Records) {
newWriteRecord := ×treamwrite.WriteRecordsInput{
DatabaseName: writeRequest.DatabaseName,
TableName: writeRequest.TableName,
Records: recordsPartition,
CommonAttributes: writeRequest.CommonAttributes,
}
result = append(result, newWriteRecord)
}
} else {
result = append(result, writeRequest)
}
}
return result
}
func (t *Timestream) buildDimensions(point telegraf.Metric) []types.Dimension {
var dimensions []types.Dimension
for tagName, tagValue := range point.Tags() {
dimension := types.Dimension{
Name: aws.String(tagName),
Value: aws.String(tagValue),
}
dimensions = append(dimensions, dimension)
}
if t.MappingMode == MappingModeSingleTable {
dimension := types.Dimension{
Name: aws.String(t.SingleTableDimensionNameForTelegrafMeasurementName),
Value: aws.String(point.Name()),
}
dimensions = append(dimensions, dimension)
}
return dimensions
}
// buildWriteRecords builds the Timestream write records from Metric Fields only.
// Tags and time are not included - common attributes are built separately.
// Records with unsupported Metric Field type are skipped.
// It returns an array of Timestream write records.
func (t *Timestream) buildWriteRecords(point telegraf.Metric) []types.Record {
var records []types.Record
dimensions := t.buildDimensions(point)
for fieldName, fieldValue := range point.Fields() {
stringFieldValue, stringFieldValueType, ok := convertValue(fieldValue)
if !ok {
t.Log.Errorf("Skipping field '%s'. The type '%s' is not supported in Timestream as MeasureValue. "+
"Supported values are: [int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64, float32, float64, bool]",
fieldName, reflect.TypeOf(fieldValue))
continue
}
timeUnit, timeValue := getTimestreamTime(point.Time())
record := types.Record{
MeasureName: aws.String(fieldName),
MeasureValueType: stringFieldValueType,
MeasureValue: aws.String(stringFieldValue),
Dimensions: dimensions,
Time: aws.String(timeValue),
TimeUnit: timeUnit,
}
records = append(records, record)
}
return records
}
// partitionRecords splits the Timestream records into smaller slices of a max size
// so that are under the limit for the Timestream API call.
// It returns the array of array of records.
func partitionRecords(size int, records []types.Record) [][]types.Record {
numberOfPartitions := len(records) / size
if len(records)%size != 0 {
numberOfPartitions++
}
partitions := make([][]types.Record, numberOfPartitions)
for i := 0; i < numberOfPartitions; i++ {
start := size * i
end := size * (i + 1)
if end > len(records) {
end = len(records)
}
partitions[i] = records[start:end]
}
return partitions
}
// getTimestreamTime produces Timestream TimeUnit and TimeValue with minimum possible granularity
// while maintaining the same information.
func getTimestreamTime(t time.Time) (timeUnit types.TimeUnit, timeValue string) {
nanosTime := t.UnixNano()
if nanosTime%1e9 == 0 {
timeUnit = types.TimeUnitSeconds
timeValue = strconv.FormatInt(nanosTime/1e9, 10)
} else if nanosTime%1e6 == 0 {
timeUnit = types.TimeUnitMilliseconds
timeValue = strconv.FormatInt(nanosTime/1e6, 10)
} else if nanosTime%1e3 == 0 {
timeUnit = types.TimeUnitMicroseconds
timeValue = strconv.FormatInt(nanosTime/1e3, 10)
} else {
timeUnit = types.TimeUnitNanoseconds
timeValue = strconv.FormatInt(nanosTime, 10)
}
return timeUnit, timeValue
}
// convertValue converts single Field value from Telegraf Metric and produces
// value, valueType Timestream representation.
func convertValue(v interface{}) (value string, valueType types.MeasureValueType, ok bool) {
ok = true
switch t := v.(type) {
case int:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatInt(int64(t), 10)
case int8:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatInt(int64(t), 10)
case int16:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatInt(int64(t), 10)
case int32:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatInt(int64(t), 10)
case int64:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatInt(t, 10)
case uint:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatUint(uint64(t), 10)
case uint8:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatUint(uint64(t), 10)
case uint16:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatUint(uint64(t), 10)
case uint32:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatUint(uint64(t), 10)
case uint64:
valueType = types.MeasureValueTypeBigint
value = strconv.FormatUint(t, 10)
case float32:
valueType = types.MeasureValueTypeDouble
value = strconv.FormatFloat(float64(t), 'f', -1, 32)
case float64:
valueType = types.MeasureValueTypeDouble
value = strconv.FormatFloat(t, 'f', -1, 64)
case bool:
valueType = types.MeasureValueTypeBoolean
if t {
value = "true"
} else {
value = "false"
}
case string:
valueType = types.MeasureValueTypeVarchar
value = t
default:
// Skip unsupported type.
ok = false
return value, valueType, ok
}
return value, valueType, ok
}