in internal/beater/beater.go [718:779]
func (s *Runner) newDocappenderConfig(tp trace.TracerProvider, mp metric.MeterProvider, memLimit float64) (
docappender.Config, *elasticsearch.Config, error,
) {
esConfig := struct {
*elasticsearch.Config `config:",inline"`
FlushBytes string `config:"flush_bytes"`
FlushInterval time.Duration `config:"flush_interval"`
MaxRequests int `config:"max_requests"`
Scaling struct {
Enabled *bool `config:"enabled"`
} `config:"autoscaling"`
}{
// Default to 1mib flushes, which is the default for go-docappender.
FlushBytes: "1 mib",
FlushInterval: time.Second,
Config: elasticsearch.DefaultConfig(),
}
esConfig.MaxIdleConnsPerHost = 10
if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
return docappender.Config{}, nil, err
}
var flushBytes int
if esConfig.FlushBytes != "" {
b, err := humanize.ParseBytes(esConfig.FlushBytes)
if err != nil {
return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
}
flushBytes = int(b)
}
minFlush := 24 * 1024
if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
flushBytes = minFlush
}
var scalingCfg docappender.ScalingConfig
if enabled := esConfig.Scaling.Enabled; enabled != nil {
scalingCfg.Disabled = !*enabled
}
cfg := docappenderConfig(docappender.Config{
CompressionLevel: esConfig.CompressionLevel,
FlushBytes: flushBytes,
FlushInterval: esConfig.FlushInterval,
TracerProvider: tp,
MeterProvider: mp,
MaxRequests: esConfig.MaxRequests,
Scaling: scalingCfg,
Logger: zap.New(s.logger.Core(), zap.WithCaller(true)),
RequireDataStream: true,
IncludeSourceOnError: docappender.False,
// Use the output's max_retries to configure the go-docappender's
// document level retries.
MaxDocumentRetries: esConfig.MaxRetries,
RetryOnDocumentStatus: []int{429}, // Only retry "safe" 429 responses.
}, memLimit, s.logger)
if cfg.MaxRequests != 0 {
esConfig.MaxIdleConnsPerHost = cfg.MaxRequests
}
return cfg, esConfig.Config, nil
}