internal/pkg/bulk/opBulk.go (231 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package bulk import ( "bytes" "context" "errors" "fmt" "time" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/mailru/easyjson" "github.com/rs/zerolog" "go.elastic.co/apm/v2" "github.com/elastic/fleet-server/v7/internal/pkg/es" ) func (b *Bulker) Create(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) { item, err := b.waitBulkAction(ctx, ActionCreate, index, id, body, opts...) if err != nil { return "", err } return item.DocumentID, nil } func (b *Bulker) Index(ctx context.Context, index, id string, body []byte, opts ...Opt) (string, error) { item, err := b.waitBulkAction(ctx, ActionIndex, index, id, body, opts...) if err != nil { return "", err } return item.DocumentID, nil } func (b *Bulker) Update(ctx context.Context, index, id string, body []byte, opts ...Opt) error { _, err := b.waitBulkAction(ctx, ActionUpdate, index, id, body, opts...) return err } func (b *Bulker) Delete(ctx context.Context, index, id string, opts ...Opt) error { _, err := b.waitBulkAction(ctx, ActionDelete, index, id, nil, opts...) return err } func (b *Bulker) waitBulkAction(ctx context.Context, action actionT, index, id string, body []byte, opts ...Opt) (*BulkIndexerResponseItem, error) { span, ctx := apm.StartSpan(ctx, fmt.Sprintf("Bulker: %s", action.String()), "bulker") defer span.End() opt := b.parseOpts(append(opts, withAPMLinkedContext(ctx))...) blk := b.newBlk(action, opt) // Serialize request const kSlop = 64 blk.buf.Grow(len(body) + kSlop) if err := b.writeBulkMeta(&blk.buf, action.String(), index, id, opt.RetryOnConflict); err != nil { return nil, err } if err := b.writeBulkBody(&blk.buf, action, body); err != nil { return nil, err } // Dispatch and wait for response resp := b.dispatch(ctx, blk) if resp.err != nil { return nil, resp.err } b.freeBlk(blk) // TODO if we can ever set span links after creation we can inject the flushQueue span into resp and link to the action span here. r, ok := resp.data.(*BulkIndexerResponseItem) if !ok { return nil, fmt.Errorf("unable to cast to *BulkIndexerResponseItem, detected type %T", resp.data) } if err := es.TranslateError(r.Status, r.Error); err != nil { return nil, err } return r, nil } func (b *Bulker) writeMget(buf *Buf, index, id string) error { if err := b.validateMeta(index, id); err != nil { return err } _, _ = buf.WriteString(`{"_index":"`) _, _ = buf.WriteString(index) _, _ = buf.WriteString(`","_id":"`) _, _ = buf.WriteString(id) _, _ = buf.WriteString(`"},`) return nil } func (b *Bulker) writeBulkMeta(buf *Buf, action, index, id, retry string) error { if err := b.validateMeta(index, id); err != nil { return err } _, _ = buf.WriteString(`{"`) _, _ = buf.WriteString(action) _, _ = buf.WriteString(`":{`) if id != "" { _, _ = buf.WriteString(`"_id":"`) _, _ = buf.WriteString(id) _, _ = buf.WriteString(`",`) } if retry != "" { _, _ = buf.WriteString(`"retry_on_conflict":`) _, _ = buf.WriteString(retry) _, _ = buf.WriteString(`,`) } _, _ = buf.WriteString(`"_index":"`) _, _ = buf.WriteString(index) _, _ = buf.WriteString("\"}}\n") return nil } func (b *Bulker) writeBulkBody(buf *Buf, action actionT, body []byte) error { if len(body) == 0 { if action == ActionDelete { return nil } // Weird to index, create, or update empty, but will allow _, _ = buf.WriteString("{}\n") return nil } if err := b.validateBody(body); err != nil { return err } _, _ = buf.Write(body) _, _ = buf.WriteRune('\n') return nil } func (b *Bulker) calcBulkSz(action, idx, id, retry string, body []byte) int { const kFraming = 19 metaSz := kFraming + len(action) + len(idx) if retry != "" { metaSz += 21 + len(retry) } var idSz int if id != "" { const kIDFraming = 9 idSz = kIDFraming + len(id) } var bodySz int if len(body) != 0 { const kBodyFraming = 1 bodySz = kBodyFraming + len(body) } return metaSz + idSz + bodySz } func (b *Bulker) flushBulk(ctx context.Context, queue queueT) error { start := time.Now() const kRoughEstimatePerItem = 200 bufSz := queue.cnt * kRoughEstimatePerItem if bufSz < queue.pending { bufSz = queue.pending } var buf bytes.Buffer buf.Grow(bufSz) queueCnt := 0 links := []apm.SpanLink{} for n := queue.head; n != nil; n = n.next { buf.Write(n.buf.Bytes()) queueCnt += 1 if n.spanLink != nil { links = append(links, *n.spanLink) } } // We should not encounter a case outside of testing where blk instances have no links // but just in case, set to nil to preserve default behavior if len(links) == 0 { links = nil } span, ctx := apm.StartSpanOptions(ctx, fmt.Sprintf("Flush: %s", queue.Type()), queue.Type(), apm.SpanOptions{ Links: links, }) defer span.End() // Do actual bulk request; defer to the client req := esapi.BulkRequest{ Body: bytes.NewReader(buf.Bytes()), } if queue.ty == kQueueRefreshBulk { req.Refresh = "true" } res, err := req.Do(ctx, b.es) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Fail BulkRequest req.Do") return err } if res.Body != nil { defer res.Body.Close() } if res.IsError() { zerolog.Ctx(ctx).Error().Str("mod", kModBulk).Str("error.message", res.String()).Msg("Fail BulkRequest result") return parseError(res, zerolog.Ctx(ctx)) } // Reuse buffer buf.Reset() bodySz, err := buf.ReadFrom(res.Body) if err != nil { zerolog.Ctx(ctx).Error(). Err(err). Str("mod", kModBulk). Msg("Response error") return err } var blk bulkIndexerResponse blk.Items = make([]bulkStubItem, 0, queueCnt) // TODO: We're loosing information abut the errors, we should check a way // to return the full error ES returns if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { zerolog.Ctx(ctx).Error().Err(err). Str("mod", kModBulk). Msg("flushBulk failed, could not unmarshal ES response") return fmt.Errorf("flushBulk failed, could not unmarshal ES response: %w", err) } if blk.HasErrors { // We lack information to properly correlate this error with what has failed. // Thus, for now it'd be more noise than information outside an investigation. zerolog.Ctx(ctx).Debug().Err(errors.New(buf.String())).Msg("Bulk call: Es returned an error") } zerolog.Ctx(ctx).Trace(). Err(err). Bool("refresh", queue.ty == kQueueRefreshBulk). Str("mod", kModBulk). Int("took", blk.Took). Dur("rtt", time.Since(start)). Bool("hasErrors", blk.HasErrors). Int("cnt", len(blk.Items)). Int("bufSz", bufSz). Int64("bodySz", bodySz). Msg("flushBulk") if len(blk.Items) != queueCnt { return fmt.Errorf("Bulk queue length mismatch") } // WARNING: Once we start pushing items to // the queue, the node pointers are invalid. // Do NOT return a non-nil value or failQueue // up the stack will fail. n := queue.head for i := range blk.Items { next := n.next // 'n' is invalid immediately on channel send item := blk.Items[i].Choose() select { case n.ch <- respT{ err: item.deriveError(), idx: n.idx, data: item, }: default: panic("Unexpected blocked response channel on flushBulk") } n = next } return nil } func (b *Bulker) HasTracer() bool { return b.tracer != nil } func (b *Bulker) StartTransaction(name, transactionType string) *apm.Transaction { return b.tracer.StartTransaction(name, transactionType) } func (b *Bulker) StartTransactionOptions(name, transactionType string, opts apm.TransactionOptions) *apm.Transaction { return b.tracer.StartTransactionOptions(name, transactionType, opts) }