in firehose/firehose.go [73:117]
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int, simpleAggregation bool) (*OutputPlugin, error) {
client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint, pluginID)
if err != nil {
return nil, err
}
records := make([]*firehose.Record, 0, maximumRecordsPerPut)
timer, err := plugins.NewTimeout(func(d time.Duration) {
logrus.Errorf("[firehose %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String())
logrus.Errorf("[firehose %d] Quitting Fluent Bit", pluginID)
os.Exit(1)
})
if err != nil {
return nil, err
}
var timeFormatter *strftime.Strftime
if timeKey != "" {
if timeFmt == "" {
timeFmt = defaultTimeFmt
}
timeFormatter, err = strftime.New(timeFmt, strftime.WithMilliseconds('L'), strftime.WithMicroseconds('f'))
if err != nil {
logrus.Errorf("[firehose %d] Issue with strftime format in 'time_key_format'", pluginID)
return nil, err
}
}
return &OutputPlugin{
region: region,
deliveryStream: deliveryStream,
client: client,
records: records,
dataKeys: dataKeys,
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
logKey: logKey,
PluginID: pluginID,
replaceDots: replaceDots,
simpleAggregation: simpleAggregation,
}, nil
}