func()

in internal/pkg/bulk/opMulti.go [31:114]


func (b *Bulker) multiWaitBulkOp(ctx context.Context, action actionT, ops []MultiOp, opts ...Opt) ([]BulkIndexerResponseItem, error) { //nolint:unparam // better to keep consistency with other methods
	if len(ops) == 0 {
		return nil, nil
	}

	if uint(len(ops)) > math.MaxUint32 {
		return nil, errors.New("too many bulk ops")
	}

	opt := b.parseOpts(append(opts, withAPMLinkedContext(ctx))...)

	// Contract is that consumer never blocks, so must preallocate.
	// Could consider making the response channel *respT to limit memory usage.
	ch := make(chan respT, len(ops))

	actionStr := action.String()

	// O(n) Determine how much space we need
	var byteCnt int
	for _, op := range ops {
		byteCnt += b.calcBulkSz(actionStr, op.Index, op.ID, opt.RetryOnConflict, op.Body)
	}

	// Create one bulk buffer to serialize each piece.
	// This decreases pressure on the heap. If we calculculate wrong,
	// the Buf objectect has the property that previously cached slices
	// are still valid.  However, underestimating the buffer size
	// can lead to multiple copies, which undermines the optimization.
	var bulkBuf Buf
	bulkBuf.Grow(byteCnt)

	// Serialize requests
	bulks := make([]bulkT, len(ops))
	for i := range ops {

		bufIdx := bulkBuf.Len()

		op := &ops[i]

		if err := b.writeBulkMeta(&bulkBuf, actionStr, op.Index, op.ID, opt.RetryOnConflict); err != nil {
			return nil, err
		}

		if err := b.writeBulkBody(&bulkBuf, action, op.Body); err != nil {
			return nil, err
		}

		bodySlice := bulkBuf.Bytes()[bufIdx:]

		bulk := &bulks[i]
		bulk.ch = ch
		bulk.idx = int32(i) //nolint:gosec // disable G115
		bulk.action = action
		bulk.buf.Set(bodySlice)
		if opt.Refresh {
			bulk.flags.Set(flagRefresh)
		}
	}

	// Dispatch requests
	if err := b.multiDispatch(ctx, bulks); err != nil {
		return nil, err
	}

	// Wait for response and populate return slice
	var lastErr error
	items := make([]BulkIndexerResponseItem, len(ops))

	for i := 0; i < len(ops); i++ {
		select {
		case r := <-ch:
			if r.err != nil {
				lastErr = r.err
			}
			if r.data != nil {
				items[r.idx] = *r.data.(*BulkIndexerResponseItem)
			}
		case <-ctx.Done():
			return nil, ctx.Err()
		}
	}

	return items, lastErr
}