func New()

in appender.go [100:159]


func New(client elastictransport.Interface, cfg Config) (*Appender, error) {
	cfg = DefaultConfig(client, cfg)
	if client == nil {
		return nil, errors.New("client is nil")
	}

	if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 {
		return nil, fmt.Errorf(
			"expected CompressionLevel in range [-1,9], got %d",
			cfg.CompressionLevel,
		)
	}

	minFlushBytes := 16 * 1024 // 16kb
	if cfg.CompressionLevel != 0 && cfg.FlushBytes < minFlushBytes {
		return nil, fmt.Errorf(
			"flush bytes config value (%d) is too small and will be ignored with compression enabled. Use at least %d",
			cfg.FlushBytes, minFlushBytes,
		)
	}

	ms, err := newMetrics(cfg)
	if err != nil {
		return nil, err
	}
	if err := BulkIndexerConfigFrom(client, cfg).Validate(); err != nil {
		return nil, fmt.Errorf("error creating bulk indexer: %w", err)
	}
	indexer := &Appender{
		pool:      cfg.BulkIndexerPool,
		config:    cfg,
		client:    client,
		closed:    make(chan struct{}),
		bulkItems: make(chan BulkIndexerItem, cfg.DocumentBufferSize),
		metrics:   ms,
	}
	// Use the Appender's pointer as the unique ID for the BulkIndexerPool.
	// Register the Appender ID in the pool.
	indexer.id = fmt.Sprintf("%p", indexer)
	indexer.pool.Register(indexer.id)
	indexer.addUpDownCount(int64(cfg.MaxRequests), &indexer.availableBulkRequests, ms.availableBulkRequests)

	// We create a cancellable context for the errgroup.Group for unblocking
	// flushes when Close returns. We intentionally do not use errgroup.WithContext,
	// because one flush failure should not cause the context to be cancelled.
	indexer.errgroupContext, indexer.cancelErrgroupContext = context.WithCancelCause(
		context.Background(),
	)
	indexer.scalingInfo.Store(scalingInfo{activeIndexers: 1})
	indexer.errgroup.Go(func() error {
		indexer.runActiveIndexer()
		return nil
	})

	if cfg.TracerProvider != nil {
		indexer.tracer = cfg.TracerProvider.Tracer("github.com/elastic/go-docappender.appender")
	}

	return indexer, nil
}