func()

in internal/beater/beater.go [718:779]


func (s *Runner) newDocappenderConfig(tp trace.TracerProvider, mp metric.MeterProvider, memLimit float64) (
	docappender.Config, *elasticsearch.Config, error,
) {
	esConfig := struct {
		*elasticsearch.Config `config:",inline"`
		FlushBytes            string        `config:"flush_bytes"`
		FlushInterval         time.Duration `config:"flush_interval"`
		MaxRequests           int           `config:"max_requests"`
		Scaling               struct {
			Enabled *bool `config:"enabled"`
		} `config:"autoscaling"`
	}{
		// Default to 1mib flushes, which is the default for go-docappender.
		FlushBytes:    "1 mib",
		FlushInterval: time.Second,
		Config:        elasticsearch.DefaultConfig(),
	}
	esConfig.MaxIdleConnsPerHost = 10

	if err := s.elasticsearchOutputConfig.Unpack(&esConfig); err != nil {
		return docappender.Config{}, nil, err
	}

	var flushBytes int
	if esConfig.FlushBytes != "" {
		b, err := humanize.ParseBytes(esConfig.FlushBytes)
		if err != nil {
			return docappender.Config{}, nil, fmt.Errorf("failed to parse flush_bytes: %w", err)
		}
		flushBytes = int(b)
	}
	minFlush := 24 * 1024
	if esConfig.CompressionLevel != 0 && flushBytes < minFlush {
		s.logger.Warnf("flush_bytes config value is too small (%d) and might be ignored by the indexer, increasing value to %d", flushBytes, minFlush)
		flushBytes = minFlush
	}
	var scalingCfg docappender.ScalingConfig
	if enabled := esConfig.Scaling.Enabled; enabled != nil {
		scalingCfg.Disabled = !*enabled
	}
	cfg := docappenderConfig(docappender.Config{
		CompressionLevel:     esConfig.CompressionLevel,
		FlushBytes:           flushBytes,
		FlushInterval:        esConfig.FlushInterval,
		TracerProvider:       tp,
		MeterProvider:        mp,
		MaxRequests:          esConfig.MaxRequests,
		Scaling:              scalingCfg,
		Logger:               zap.New(s.logger.Core(), zap.WithCaller(true)),
		RequireDataStream:    true,
		IncludeSourceOnError: docappender.False,
		// Use the output's max_retries to configure the go-docappender's
		// document level retries.
		MaxDocumentRetries:    esConfig.MaxRetries,
		RetryOnDocumentStatus: []int{429}, // Only retry "safe" 429 responses.
	}, memLimit, s.logger)
	if cfg.MaxRequests != 0 {
		esConfig.MaxIdleConnsPerHost = cfg.MaxRequests
	}

	return cfg, esConfig.Config, nil
}