in opensearchutil/bulk_indexer.go [468:588]
func (w *worker) flush(ctx context.Context) error {
if w.bi.config.OnFlushStart != nil {
ctx = w.bi.config.OnFlushStart(ctx)
}
if w.bi.config.OnFlushEnd != nil {
defer func() { w.bi.config.OnFlushEnd(ctx) }()
}
if w.buf.Len() < 1 {
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: Buffer empty\n", w.id)
}
return nil
}
var (
err error
blk BulkIndexerResponse
)
defer func() {
w.items = w.items[:0]
w.buf.Reset()
}()
if w.bi.config.DebugLogger != nil {
w.bi.config.DebugLogger.Printf("[worker-%03d] Flush: %s\n", w.id, w.buf.String())
}
atomic.AddUint64(&w.bi.stats.numRequests, 1)
req := opensearchapi.BulkRequest{
Index: w.bi.config.Index,
Body: w.buf,
Pipeline: w.bi.config.Pipeline,
Refresh: w.bi.config.Refresh,
Routing: w.bi.config.Routing,
Source: w.bi.config.Source,
SourceExcludes: w.bi.config.SourceExcludes,
SourceIncludes: w.bi.config.SourceIncludes,
Timeout: w.bi.config.Timeout,
WaitForActiveShards: w.bi.config.WaitForActiveShards,
Pretty: w.bi.config.Pretty,
Human: w.bi.config.Human,
ErrorTrace: w.bi.config.ErrorTrace,
FilterPath: w.bi.config.FilterPath,
Header: w.bi.config.Header,
}
res, err := req.Do(ctx, w.bi.config.Client)
if err != nil {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: %s", err)
}
if res.Body != nil {
defer res.Body.Close()
}
if res.IsError() {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: %s", res.String())
}
if err := w.bi.config.Decoder.UnmarshalFromReader(res.Body, &blk); err != nil {
// TODO(karmi): Wrap error (include response struct)
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: error parsing response body: %s", err)
}
for i, blkItem := range blk.Items {
var (
item BulkIndexerItem
info BulkIndexerResponseItem
op string
)
item = w.items[i]
// The OpenSearch bulk response contains an array of maps like this:
// [ { "index": { ... } }, { "create": { ... } }, ... ]
// We range over the map, to set the first key and value as "op" and "info".
for k, v := range blkItem {
op = k
info = v
}
if info.Error.Type != "" || info.Status > 201 {
atomic.AddUint64(&w.bi.stats.numFailed, 1)
if item.OnFailure != nil {
item.OnFailure(ctx, item, info, nil)
}
} else {
atomic.AddUint64(&w.bi.stats.numFlushed, 1)
switch op {
case "index":
atomic.AddUint64(&w.bi.stats.numIndexed, 1)
case "create":
atomic.AddUint64(&w.bi.stats.numCreated, 1)
case "delete":
atomic.AddUint64(&w.bi.stats.numDeleted, 1)
case "update":
atomic.AddUint64(&w.bi.stats.numUpdated, 1)
}
if item.OnSuccess != nil {
item.OnSuccess(ctx, item, info)
}
}
}
return err
}