in kinesis/kinesis.go [117:171]
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint, pluginID)
if err != nil {
return nil, err
}
timer, err := plugins.NewTimeout(func(d time.Duration) {
logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String())
logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID)
os.Exit(1)
})
if err != nil {
return nil, err
}
stringGen := util.NewRandomStringGenerator(8)
var timeFormatter *strftime.Strftime
if timeKey != "" {
if timeFmt == "" {
timeFmt = defaultTimeFmt
}
timeFormatter, err = strftime.New(timeFmt, strftime.WithMilliseconds('L'), strftime.WithMicroseconds('f'))
if err != nil {
logrus.Errorf("[kinesis %d] Issue with strftime format in 'time_key_format'", pluginID)
return nil, err
}
}
var aggregator *aggregate.Aggregator
if isAggregate {
aggregator = aggregate.NewAggregator(stringGen)
}
return &OutputPlugin{
stream: stream,
client: client,
dataKeys: dataKeys,
partitionKey: partitionKey,
appendNewline: appendNewline,
timeKey: timeKey,
fmtStrftime: timeFormatter,
logKey: logKey,
timer: timer,
PluginID: pluginID,
stringGen: stringGen,
Concurrency: concurrency,
concurrencyRetryLimit: retryLimit,
isAggregate: isAggregate,
aggregator: aggregator,
compression: compression,
replaceDots: replaceDots,
}, nil
}