in fluent-bit-kinesis.go [59:164]
func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, error) {
stream := output.FLBPluginConfigKey(ctx, "stream")
logrus.Infof("[kinesis %d] plugin parameter stream = '%s'", pluginID, stream)
region := output.FLBPluginConfigKey(ctx, "region")
logrus.Infof("[kinesis %d] plugin parameter region = '%s'", pluginID, region)
dataKeys := output.FLBPluginConfigKey(ctx, "data_keys")
logrus.Infof("[kinesis %d] plugin parameter data_keys = '%s'", pluginID, dataKeys)
partitionKey := output.FLBPluginConfigKey(ctx, "partition_key")
logrus.Infof("[kinesis %d] plugin parameter partition_key = '%s'", pluginID, partitionKey)
roleARN := output.FLBPluginConfigKey(ctx, "role_arn")
logrus.Infof("[kinesis %d] plugin parameter role_arn = '%s'", pluginID, roleARN)
kinesisEndpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, kinesisEndpoint)
stsEndpoint := output.FLBPluginConfigKey(ctx, "sts_endpoint")
logrus.Infof("[kinesis %d] plugin parameter sts_endpoint = '%s'", pluginID, stsEndpoint)
appendNewline := output.FLBPluginConfigKey(ctx, "append_newline")
logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline)
timeKey := output.FLBPluginConfigKey(ctx, "time_key")
logrus.Infof("[kinesis %d] plugin parameter time_key = '%s'", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[kinesis %d] plugin parameter time_key_format = '%s'", pluginID, timeKeyFmt)
concurrency := output.FLBPluginConfigKey(ctx, "experimental_concurrency")
logrus.Infof("[kinesis %d] plugin parameter experimental_concurrency = '%s'", pluginID, concurrency)
concurrencyRetries := output.FLBPluginConfigKey(ctx, "experimental_concurrency_retries")
logrus.Infof("[kinesis %d] plugin parameter experimental_concurrency_retries = '%s'", pluginID, concurrencyRetries)
logKey := output.FLBPluginConfigKey(ctx, "log_key")
logrus.Infof("[kinesis %d] plugin parameter log_key = '%s'", pluginID, logKey)
aggregation := output.FLBPluginConfigKey(ctx, "aggregation")
logrus.Infof("[kinesis %d] plugin parameter aggregation = '%s'", pluginID, aggregation)
compression := output.FLBPluginConfigKey(ctx, "compression")
logrus.Infof("[kinesis %d] plugin parameter compression = '%s'", pluginID, compression)
replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
logrus.Infof("[kinesis %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)
if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
}
if partitionKey == "log" {
return nil, fmt.Errorf("[kinesis %d] 'log' cannot be set as the partition key", pluginID)
}
if partitionKey == "" {
logrus.Infof("[kinesis %d] no partition key provided. A random one will be generated.", pluginID)
}
appendNL := false
if strings.ToLower(appendNewline) == "true" {
appendNL = true
}
isAggregate := false
if strings.ToLower(aggregation) == "true" {
isAggregate = true
}
if isAggregate && partitionKey != "" {
logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simultaneously", pluginID)
}
var concurrencyInt, concurrencyRetriesInt int
var err error
if concurrency != "" {
concurrencyInt, err = strconv.Atoi(concurrency)
if err != nil {
logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err)
return nil, err
}
if concurrencyInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency)
}
if concurrencyInt > maximumConcurrency {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency)
}
if concurrencyInt > 0 {
logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss. If 'experimental_concurrency_retries' is reached data will be lost.", pluginID)
}
}
if concurrencyRetries != "" {
concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries)
if err != nil {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err)
}
if concurrencyRetriesInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries)
}
} else {
concurrencyRetriesInt = defaultConcurrentRetries
}
var comp kinesis.CompressionType
if strings.ToLower(compression) == string(kinesis.CompressionZlib) {
comp = kinesis.CompressionZlib
} else if strings.ToLower(compression) == string(kinesis.CompressionGzip) {
comp = kinesis.CompressionGzip
} else if strings.ToLower(compression) == string(kinesis.CompressionNone) || compression == "" {
comp = kinesis.CompressionNone
} else {
return nil, fmt.Errorf("[kinesis %d] Invalid 'compression' value (%s) specified, must be 'zlib', 'gzip', 'none', or undefined", pluginID, compression)
}
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, concurrencyInt, concurrencyRetriesInt, isAggregate, appendNL, comp, pluginID)
}