internal/pkg/bulk/opMulti.go (84 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
package bulk
import (
"context"
"errors"
"math"
)
// TODO: Are multi requests used by anything? a quick grep shows no hits outside the bulk package.
func (b *Bulker) MCreate(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error) {
return b.multiWaitBulkOp(ctx, ActionCreate, ops)
}
func (b *Bulker) MIndex(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error) {
return b.multiWaitBulkOp(ctx, ActionIndex, ops)
}
func (b *Bulker) MUpdate(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error) {
return b.multiWaitBulkOp(ctx, ActionUpdate, ops)
}
func (b *Bulker) MDelete(ctx context.Context, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error) {
return b.multiWaitBulkOp(ctx, ActionDelete, ops)
}
func (b *Bulker) multiWaitBulkOp(ctx context.Context, action actionT, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error) { //nolint:unparam // better to keep consistency with other methods
if len(ops) == 0 {
return nil, nil
}
if uint(len(ops)) > math.MaxUint32 {
return nil, errors.New("too many bulk ops")
}
opt := b.parseOpts(append(opts, withAPMLinkedContext(ctx))...)
// Contract is that consumer never blocks, so must preallocate.
// Could consider making the response channel *respT to limit memory usage.
ch := make(chan respT, len(ops))
actionStr := action.String()
// O(n) Determine how much space we need
var byteCnt int
for _, op := range ops {
byteCnt += b.calcBulkSz(actionStr, op.Index, op.ID, opt.RetryOnConflict, op.Body)
}
// Create one bulk buffer to serialize each piece.
// This decreases pressure on the heap. If we calculculate wrong,
// the Buf objectect has the property that previously cached slices
// are still valid. However, underestimating the buffer size
// can lead to multiple copies, which undermines the optimization.
var bulkBuf Buf
bulkBuf.Grow(byteCnt)
// Serialize requests
bulks := make([]bulkT, len(ops))
for i := range ops {
bufIdx := bulkBuf.Len()
op := &ops[i]
if err := b.writeBulkMeta(&bulkBuf, actionStr, op.Index, op.ID, opt.RetryOnConflict); err != nil {
return nil, err
}
if err := b.writeBulkBody(&bulkBuf, action, op.Body); err != nil {
return nil, err
}
bodySlice := bulkBuf.Bytes()[bufIdx:]
bulk := &bulks[i]
bulk.ch = ch
bulk.idx = int32(i) //nolint:gosec // disable G115
bulk.action = action
bulk.buf.Set(bodySlice)
if opt.Refresh {
bulk.flags.Set(flagRefresh)
}
}
// Dispatch requests
if err := b.multiDispatch(ctx, bulks); err != nil {
return nil, err
}
// Wait for response and populate return slice
var lastErr error
items := make([]BulkIndexerResponseItem, len(ops))
for i := 0; i < len(ops); i++ {
select {
case r := <-ch:
if r.err != nil {
lastErr = r.err
}
if r.data != nil {
items[r.idx] = *r.data.(*BulkIndexerResponseItem)
}
case <-ctx.Done():
return nil, ctx.Err()
}
}
return items, lastErr
}
func (b *Bulker) multiDispatch(ctx context.Context, blks []bulkT) error {
// Dispatch to bulk Run loop; Iterate by reference.
for i := range blks {
select {
case b.ch <- &blks[i]:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}