func NewOutputPlugin()

in firehose/firehose.go [73:117]


func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int, simpleAggregation bool) (*OutputPlugin, error) {
	client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint, pluginID)
	if err != nil {
		return nil, err
	}

	records := make([]*firehose.Record, 0, maximumRecordsPerPut)

	timer, err := plugins.NewTimeout(func(d time.Duration) {
		logrus.Errorf("[firehose %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String())
		logrus.Errorf("[firehose %d] Quitting Fluent Bit", pluginID)
		os.Exit(1)
	})

	if err != nil {
		return nil, err
	}

	var timeFormatter *strftime.Strftime
	if timeKey != "" {
		if timeFmt == "" {
			timeFmt = defaultTimeFmt
		}
		timeFormatter, err = strftime.New(timeFmt, strftime.WithMilliseconds('L'), strftime.WithMicroseconds('f'))
		if err != nil {
			logrus.Errorf("[firehose %d] Issue with strftime format in 'time_key_format'", pluginID)
			return nil, err
		}
	}

	return &OutputPlugin{
		region:                        region,
		deliveryStream:                deliveryStream,
		client:                        client,
		records:                       records,
		dataKeys:                      dataKeys,
		timer:                         timer,
		timeKey:                       timeKey,
		fmtStrftime:                   timeFormatter,
		logKey:                        logKey,
		PluginID:                      pluginID,
		replaceDots:                   replaceDots,
		simpleAggregation:             simpleAggregation,
	}, nil
}