in appender.go [100:159]
func New(client elastictransport.Interface, cfg Config) (*Appender, error) {
cfg = DefaultConfig(client, cfg)
if client == nil {
return nil, errors.New("client is nil")
}
if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 {
return nil, fmt.Errorf(
"expected CompressionLevel in range [-1,9], got %d",
cfg.CompressionLevel,
)
}
minFlushBytes := 16 * 1024 // 16kb
if cfg.CompressionLevel != 0 && cfg.FlushBytes < minFlushBytes {
return nil, fmt.Errorf(
"flush bytes config value (%d) is too small and will be ignored with compression enabled. Use at least %d",
cfg.FlushBytes, minFlushBytes,
)
}
ms, err := newMetrics(cfg)
if err != nil {
return nil, err
}
if err := BulkIndexerConfigFrom(client, cfg).Validate(); err != nil {
return nil, fmt.Errorf("error creating bulk indexer: %w", err)
}
indexer := &Appender{
pool: cfg.BulkIndexerPool,
config: cfg,
client: client,
closed: make(chan struct{}),
bulkItems: make(chan BulkIndexerItem, cfg.DocumentBufferSize),
metrics: ms,
}
// Use the Appender's pointer as the unique ID for the BulkIndexerPool.
// Register the Appender ID in the pool.
indexer.id = fmt.Sprintf("%p", indexer)
indexer.pool.Register(indexer.id)
indexer.addUpDownCount(int64(cfg.MaxRequests), &indexer.availableBulkRequests, ms.availableBulkRequests)
// We create a cancellable context for the errgroup.Group for unblocking
// flushes when Close returns. We intentionally do not use errgroup.WithContext,
// because one flush failure should not cause the context to be cancelled.
indexer.errgroupContext, indexer.cancelErrgroupContext = context.WithCancelCause(
context.Background(),
)
indexer.scalingInfo.Store(scalingInfo{activeIndexers: 1})
indexer.errgroup.Go(func() error {
indexer.runActiveIndexer()
return nil
})
if cfg.TracerProvider != nil {
indexer.tracer = cfg.TracerProvider.Tracer("github.com/elastic/go-docappender.appender")
}
return indexer, nil
}