func NewOutputPlugin()

in kinesis/kinesis.go [117:171]


func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, concurrency, retryLimit int, isAggregate, appendNewline bool, compression CompressionType, pluginID int) (*OutputPlugin, error) {
	client, err := newPutRecordsClient(roleARN, region, kinesisEndpoint, stsEndpoint, pluginID)
	if err != nil {
		return nil, err
	}

	timer, err := plugins.NewTimeout(func(d time.Duration) {
		logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String())
		logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID)
		os.Exit(1)
	})

	if err != nil {
		return nil, err
	}

	stringGen := util.NewRandomStringGenerator(8)

	var timeFormatter *strftime.Strftime
	if timeKey != "" {
		if timeFmt == "" {
			timeFmt = defaultTimeFmt
		}
		timeFormatter, err = strftime.New(timeFmt, strftime.WithMilliseconds('L'), strftime.WithMicroseconds('f'))
		if err != nil {
			logrus.Errorf("[kinesis %d] Issue with strftime format in 'time_key_format'", pluginID)
			return nil, err
		}
	}

	var aggregator *aggregate.Aggregator
	if isAggregate {
		aggregator = aggregate.NewAggregator(stringGen)
	}

	return &OutputPlugin{
		stream:                stream,
		client:                client,
		dataKeys:              dataKeys,
		partitionKey:          partitionKey,
		appendNewline:         appendNewline,
		timeKey:               timeKey,
		fmtStrftime:           timeFormatter,
		logKey:                logKey,
		timer:                 timer,
		PluginID:              pluginID,
		stringGen:             stringGen,
		Concurrency:           concurrency,
		concurrencyRetryLimit: retryLimit,
		isAggregate:           isAggregate,
		aggregator:            aggregator,
		compression:           compression,
		replaceDots:           replaceDots,
	}, nil
}