esutil/bulk_indexer.go (503 lines of code) (raw):
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package esutil
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/elastic/go-elasticsearch/v9"
"github.com/elastic/go-elasticsearch/v9/esapi"
)
// BulkIndexer represents a parallel, asynchronous, efficient indexer for Elasticsearch.
type BulkIndexer interface {
// Add adds an item to the indexer. It returns an error when the item cannot be added.
// Use the OnSuccess and OnFailure callbacks to get the operation result for the item.
//
// You must call the Close() method after you're done adding items.
//
// It is safe for concurrent use. When it's called from goroutines,
// they must finish before the call to Close, eg. using sync.WaitGroup.
Add(context.Context, BulkIndexerItem) error
// Close waits until all added items are flushed and closes the indexer.
Close(context.Context) error
// Stats returns indexer statistics.
Stats() BulkIndexerStats
}
// BulkIndexerConfig represents configuration of the indexer.
type BulkIndexerConfig struct {
NumWorkers int // The number of workers. Defaults to runtime.NumCPU().
FlushBytes int // The flush threshold in bytes. Defaults to 5MB.
FlushInterval time.Duration // The flush threshold as duration. Defaults to 30sec.
Client esapi.Transport // The Elasticsearch client.
Decoder BulkResponseJSONDecoder // A custom JSON decoder.
DebugLogger BulkIndexerDebugLogger // An optional logger for debugging.
OnError func(context.Context, error) // Called for indexer errors.
OnFlushStart func(context.Context) context.Context // Called when the flush starts.
OnFlushEnd func(context.Context) // Called when the flush ends.
// Parameters of the Bulk API.
Index string
ErrorTrace bool
FilterPath []string
Header http.Header
Human bool
Pipeline string
Pretty bool
Refresh string
Routing string
RequireAlias bool
Source []string
SourceExcludes []string
SourceIncludes []string
Timeout time.Duration
WaitForActiveShards string
}
// BulkIndexerStats represents the indexer statistics.
type BulkIndexerStats struct {
NumAdded uint64
NumFlushed uint64
NumFailed uint64
NumIndexed uint64
NumCreated uint64
NumUpdated uint64
NumDeleted uint64
NumRequests uint64
FlushedBytes uint64
}
// BulkIndexerItem represents an indexer item.
type BulkIndexerItem struct {
Index string
Action string
DocumentID string
Routing string
RequireAlias bool
Version *int64
VersionType string
Body io.ReadSeeker
RetryOnConflict *int
IfSeqNo *int64
IfPrimaryTerm *int64
meta bytes.Buffer // Item metadata header
payloadLength int // Item payload total length metadata+newline+body length
OnSuccess func(context.Context, BulkIndexerItem, BulkIndexerResponseItem) // Per item
OnFailure func(context.Context, BulkIndexerItem, BulkIndexerResponseItem, error) // Per item
}
// marshallMeta format as JSON the item metadata.
func (item *BulkIndexerItem) marshallMeta() {
// Pre-allocate a buffer large enough for most use cases.
// 'aux = aux[:0]' resets the length without changing the capacity.
aux := make([]byte, 0, 256)
item.meta.WriteRune('{')
item.meta.Write(strconv.AppendQuote(aux, item.Action))
aux = aux[:0]
item.meta.WriteRune(':')
item.meta.WriteRune('{')
if item.DocumentID != "" {
item.meta.WriteString(`"_id":`)
item.meta.Write(strconv.AppendQuote(aux, item.DocumentID))
aux = aux[:0]
}
if item.DocumentID != "" && item.Version != nil {
item.meta.WriteRune(',')
item.meta.WriteString(`"version":`)
item.meta.WriteString(strconv.FormatInt(*item.Version, 10))
}
if item.DocumentID != "" && item.VersionType != "" {
item.meta.WriteRune(',')
item.meta.WriteString(`"version_type":`)
item.meta.Write(strconv.AppendQuote(aux, item.VersionType))
aux = aux[:0]
}
if item.Routing != "" {
if item.DocumentID != "" {
item.meta.WriteRune(',')
}
item.meta.WriteString(`"routing":`)
item.meta.Write(strconv.AppendQuote(aux, item.Routing))
aux = aux[:0]
}
if item.Index != "" {
if item.DocumentID != "" || item.Routing != "" {
item.meta.WriteRune(',')
}
item.meta.WriteString(`"_index":`)
item.meta.Write(strconv.AppendQuote(aux, item.Index))
aux = aux[:0]
}
if item.RetryOnConflict != nil && item.Action == "update" {
if item.DocumentID != "" || item.Routing != "" || item.Index != "" {
item.meta.WriteString(",")
}
item.meta.WriteString(`"retry_on_conflict":`)
item.meta.Write(strconv.AppendInt(aux, int64(*item.RetryOnConflict), 10))
aux = aux[:0]
}
if item.RequireAlias {
if item.DocumentID != "" || item.Routing != "" || item.Index != "" || item.RetryOnConflict != nil {
item.meta.WriteString(",")
}
item.meta.WriteString(`"require_alias":`)
item.meta.Write(strconv.AppendBool(aux, item.RequireAlias))
aux = aux[:0]
}
if item.DocumentID != "" && item.IfSeqNo != nil && item.IfPrimaryTerm != nil {
item.meta.WriteRune(',')
item.meta.WriteString(`"if_seq_no":`)
item.meta.WriteString(strconv.FormatInt(*item.IfSeqNo, 10))
item.meta.WriteRune(',')
item.meta.WriteString(`"if_primary_term":`)
item.meta.WriteString(strconv.FormatInt(*item.IfPrimaryTerm, 10))
}
item.meta.WriteRune('}')
item.meta.WriteRune('}')
item.meta.WriteRune('\n')
}
// computeLength calculate the size of the body and the metadata.
func (item *BulkIndexerItem) computeLength() error {
if item.Body != nil {
n, err := item.Body.Seek(0, io.SeekEnd)
if err != nil {
return err
}
item.payloadLength += int(n)
_, err = item.Body.Seek(0, io.SeekStart)
if err != nil {
return err
}
}
item.payloadLength += len(item.meta.Bytes())
// Add one byte to account for newline at the end of payload.
item.payloadLength++
return nil
}
// BulkIndexerResponse represents the Elasticsearch response.
type BulkIndexerResponse struct {
Took int `json:"took"`
HasErrors bool `json:"errors"`
Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"`
}
// BulkIndexerResponseItem represents the Elasticsearch response item.
type BulkIndexerResponseItem struct {
Index string `json:"_index"`
DocumentID string `json:"_id"`
Version int64 `json:"_version"`
Result string `json:"result"`
Status int `json:"status"`
SeqNo int64 `json:"_seq_no"`
PrimTerm int64 `json:"_primary_term"`
FailureStore string `json:"failure_store,omitempty"`
Shards struct {
Total int `json:"total"`
Successful int `json:"successful"`
Failed int `json:"failed"`
} `json:"_shards"`
Error struct {
Type string `json:"type"`
Reason string `json:"reason"`
Cause struct {
Type string `json:"type"`
Reason string `json:"reason"`
} `json:"caused_by"`
} `json:"error,omitempty"`
}
// BulkResponseJSONDecoder defines the interface for custom JSON decoders.
type BulkResponseJSONDecoder interface {
UnmarshalFromReader(io.Reader, *BulkIndexerResponse) error
}
// BulkIndexerDebugLogger defines the interface for a debugging logger.
type BulkIndexerDebugLogger interface {
Printf(string, ...interface{})
}
type bulkIndexer struct {
wg sync.WaitGroup
queue chan BulkIndexerItem
workers []*worker
stats *bulkIndexerStats
config BulkIndexerConfig
}
type bulkIndexerStats struct {
numAdded uint64
numFlushed uint64
numFailed uint64
numIndexed uint64
numCreated uint64
numUpdated uint64
numDeleted uint64
numRequests uint64
flushedBytes uint64
}
// NewBulkIndexer creates a new bulk indexer.
func NewBulkIndexer(cfg BulkIndexerConfig) (BulkIndexer, error) {
if cfg.Client == nil {
cfg.Client, _ = elasticsearch.NewDefaultClient()
}
if cfg.Decoder == nil {
cfg.Decoder = defaultJSONDecoder{}
}
if cfg.NumWorkers == 0 {
cfg.NumWorkers = runtime.NumCPU()
}
if cfg.FlushBytes == 0 {
cfg.FlushBytes = 5e+6
}
if cfg.FlushInterval == 0 {
cfg.FlushInterval = 30 * time.Second
}
bi := bulkIndexer{
config: cfg,
stats: &bulkIndexerStats{},
}
bi.init()
return &bi, nil
}
// Add adds an item to the indexer.
//
// Adding an item after a call to Close() will panic.
func (bi *bulkIndexer) Add(ctx context.Context, item BulkIndexerItem) error {
atomic.AddUint64(&bi.stats.numAdded, 1)
// Serialize metadata to JSON
item.marshallMeta()
// Compute length for body & metadata
if err := item.computeLength(); err != nil {
return err
}
select {
case <-ctx.Done():
if bi.config.OnError != nil {
bi.config.OnError(ctx, ctx.Err())
}
return ctx.Err()
case bi.queue <- item:
}
return nil
}
// Close stops the periodic flush, closes the indexer queue channel,
// which triggers the workers to flush and stop.
func (bi *bulkIndexer) Close(ctx context.Context) error {
close(bi.queue)
select {
case <-ctx.Done():
if bi.config.OnError != nil {
bi.config.OnError(ctx, ctx.Err())
}
return ctx.Err()
default:
bi.wg.Wait()
}
return nil
}
// Stats returns indexer statistics.
func (bi *bulkIndexer) Stats() BulkIndexerStats {
return BulkIndexerStats{
NumAdded: atomic.LoadUint64(&bi.stats.numAdded),
NumFlushed: atomic.LoadUint64(&bi.stats.numFlushed),
NumFailed: atomic.LoadUint64(&bi.stats.numFailed),
NumIndexed: atomic.LoadUint64(&bi.stats.numIndexed),
NumCreated: atomic.LoadUint64(&bi.stats.numCreated),
NumUpdated: atomic.LoadUint64(&bi.stats.numUpdated),
NumDeleted: atomic.LoadUint64(&bi.stats.numDeleted),
NumRequests: atomic.LoadUint64(&bi.stats.numRequests),
FlushedBytes: atomic.LoadUint64(&bi.stats.flushedBytes),
}
}
// init initializes the bulk indexer.
func (bi *bulkIndexer) init() {
bi.queue = make(chan BulkIndexerItem, bi.config.NumWorkers)
for i := 1; i <= bi.config.NumWorkers; i++ {
bi.wg.Add(1)
w := worker{
id: i,
ch: bi.queue,
bi: bi,
buf: bytes.NewBuffer(make([]byte, 0, bi.config.FlushBytes)),
ticker: time.NewTicker(bi.config.FlushInterval),
}
w.run()
bi.workers = append(bi.workers, &w)
}
}
// worker represents an indexer worker.
type worker struct {
id int
ch <-chan BulkIndexerItem
bi *bulkIndexer
buf *bytes.Buffer
items []BulkIndexerItem
ticker *time.Ticker
}
// run launches the worker in a goroutine.
func (w *worker) run() {
go func() {
ctx := context.Background()
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Started\n", w.id)
}
defer func() {
w.flush(ctx)
w.ticker.Stop()
w.bi.wg.Done()
}()
for {
select {
case <-w.ticker.C:
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Auto-flushing after %s\n",
w.id, w.bi.config.FlushInterval)
}
w.flush(ctx)
case item, ok := <-w.ch:
if !ok {
return
}
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Received item [%s:%s]\n", w.id, item.Action, item.DocumentID)
}
oversizePayload := w.bi.config.FlushBytes <= item.payloadLength
if !oversizePayload && w.buf.Len() > 0 && w.buf.Len()+item.payloadLength >= w.bi.config.FlushBytes {
if !w.flush(ctx) {
continue
}
}
if err := w.writeMeta(&item); err != nil {
if item.OnFailure != nil {
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
}
atomic.AddUint64(&w.bi.stats.numFailed, 1)
continue
}
if err := w.writeBody(&item); err != nil {
if item.OnFailure != nil {
item.OnFailure(ctx, item, BulkIndexerResponseItem{}, err)
}
atomic.AddUint64(&w.bi.stats.numFailed, 1)
continue
}
w.items = append(w.items, item)
// Should the item payload exceed the configured FlushBytes flush happens instantly.
if oversizePayload {
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Oversize Payload in item [%s:%s]\n", w.id, item.Action, item.DocumentID)
}
w.flush(ctx)
}
}
}
}()
}
// writeMeta writes the item metadata to the buffer.
func (w *worker) writeMeta(item *BulkIndexerItem) error {
if _, err := w.buf.Write(item.meta.Bytes()); err != nil {
return err
}
return nil
}
// writeBody writes the item body to the buffer.
func (w *worker) writeBody(item *BulkIndexerItem) error {
if item.Body != nil {
if _, err := w.buf.ReadFrom(item.Body); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(context.Background(), err)
}
return err
}
item.Body.Seek(0, io.SeekStart)
w.buf.WriteRune('\n')
}
return nil
}
// flush writes out the worker buffer and handles errors.
// It also restarts the ticker.
// Returns true to indicate success.
func (w *worker) flush(ctx context.Context) bool {
ok := true
if err := w.flushBuffer(ctx); err != nil {
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, err)
}
ok = false
}
w.ticker.Reset(w.bi.config.FlushInterval)
return ok
}
// flushBuffer writes out the worker buffer.
func (w *worker) flushBuffer(ctx context.Context) error {
if w.bi.config.OnFlushStart != nil {
ctx = w.bi.config.OnFlushStart(ctx)
}
if w.bi.config.OnFlushEnd != nil {
defer func() { w.bi.config.OnFlushEnd(ctx) }()
}
bufLen := w.buf.Len()
if bufLen < 1 {
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: Buffer empty\n", w.id)
}
return nil
}
var (
err error
blk BulkIndexerResponse
)
defer func() {
w.items = nil
if w.buf.Cap() > w.bi.config.FlushBytes {
w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes))
} else {
w.buf.Reset()
}
}()
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: %s\n", w.id, w.buf.String())
}
atomic.AddUint64(&w.bi.stats.numRequests, 1)
req := esapi.BulkRequest{
Index: w.bi.config.Index,
Body: w.buf,
Pipeline: w.bi.config.Pipeline,
Refresh: w.bi.config.Refresh,
Routing: w.bi.config.Routing,
Source: w.bi.config.Source,
SourceExcludes: w.bi.config.SourceExcludes,
SourceIncludes: w.bi.config.SourceIncludes,
Timeout: w.bi.config.Timeout,
WaitForActiveShards: w.bi.config.WaitForActiveShards,
Pretty: w.bi.config.Pretty,
Human: w.bi.config.Human,
ErrorTrace: w.bi.config.ErrorTrace,
FilterPath: w.bi.config.FilterPath,
Header: w.bi.config.Header.Clone(),
}
if w.bi.config.RequireAlias {
req.RequireAlias = &w.bi.config.RequireAlias
}
// Add Header and MetaHeader to config if not already set
if req.Header == nil {
req.Header = http.Header{}
}
req.Header.Set(elasticsearch.HeaderClientMeta, "h=bp")
res, err := req.Do(ctx, w.bi.config.Client)
if err != nil {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: %s", err)
}
if res.Body != nil {
defer res.Body.Close()
}
if res.IsError() {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", res.String()))
}
return fmt.Errorf("flush: %s", res.String())
}
if err := w.bi.config.Decoder.UnmarshalFromReader(res.Body, &blk); err != nil {
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: error parsing response body: %s", err)
}
for i, blkItem := range blk.Items {
var (
item BulkIndexerItem
info BulkIndexerResponseItem
op string
)
item = w.items[i]
// The Elasticsearch bulk response contains an array of maps like this:
// [ { "index": { ... } }, { "create": { ... } }, ... ]
// We range over the map, to set the first key and value as "op" and "info".
for k, v := range blkItem {
op = k
info = v
}
if info.Error.Type != "" || info.Status > 201 {
atomic.AddUint64(&w.bi.stats.numFailed, 1)
if item.OnFailure != nil {
item.OnFailure(ctx, item, info, nil)
}
} else {
atomic.AddUint64(&w.bi.stats.numFlushed, 1)
switch op {
case "index":
atomic.AddUint64(&w.bi.stats.numIndexed, 1)
case "create":
atomic.AddUint64(&w.bi.stats.numCreated, 1)
case "delete":
atomic.AddUint64(&w.bi.stats.numDeleted, 1)
case "update":
atomic.AddUint64(&w.bi.stats.numUpdated, 1)
}
if item.OnSuccess != nil {
item.OnSuccess(ctx, item, info)
}
}
}
atomic.AddUint64(&w.bi.stats.flushedBytes, uint64(bufLen))
return err
}
type defaultJSONDecoder struct{}
func (d defaultJSONDecoder) UnmarshalFromReader(r io.Reader, blk *BulkIndexerResponse) error {
return json.NewDecoder(r).Decode(blk)
}