internal/pkg/bulk/opSearch.go (180 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. package bulk import ( "bytes" "context" "encoding/json" "fmt" "time" "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/sqn" "github.com/elastic/go-elasticsearch/v8/esapi" "github.com/mailru/easyjson" "github.com/rs/zerolog" "go.elastic.co/apm/v2" ) func (b *Bulker) Search(ctx context.Context, index string, body []byte, opts ...Opt) (*es.ResultT, error) { span, ctx := apm.StartSpan(ctx, "Bulker: search", "bulker") defer span.End() opt := b.parseOpts(append(opts, withAPMLinkedContext(ctx))...) action := ActionSearch // Use /_fleet/_fleet_msearch fleet plugin endpoint if need to wait for checkpoints if len(opt.WaitForCheckpoints) > 0 { action = ActionFleetSearch } blk := b.newBlk(action, opt) // Serialize request const kSlop = 64 blk.buf.Grow(len(body) + kSlop) if err := b.writeMsearchMeta(&blk.buf, index, opt.Indices, opt.WaitForCheckpoints, opt.IgnoreUnavailable); err != nil { return nil, err } if err := b.writeMsearchBody(&blk.buf, body); err != nil { return nil, err } // Process response resp := b.dispatch(ctx, blk) if resp.err != nil { return nil, resp.err } b.freeBlk(blk) // TODO if we can ever set span links after creation we can inject the flushQueue span into resp and link to the action span here. // Interpret response r, ok := resp.data.(*MsearchResponseItem) if !ok { return nil, fmt.Errorf("unable to cast response as type *MsearchResponseItem, detected type: %T", resp.data) } return &es.ResultT{HitsT: r.Hits, Aggregations: r.Aggregations}, nil } func (b *Bulker) writeMsearchMeta(buf *Buf, index string, moreIndices []string, checkpoints []int64, ignoreUnavailble bool) error { if err := b.validateIndex(index); err != nil { return err } needComma := true _, _ = buf.WriteString("{") if len(moreIndices) > 0 { if err := b.validateIndices(moreIndices); err != nil { return err } indices := []string{index} indices = append(indices, moreIndices...) _, _ = buf.WriteString(`"index": `) if d, err := json.Marshal(indices); err != nil { return err } else { _, _ = buf.Write(d) } } else if index != "" { _, _ = buf.WriteString(`"index": "`) _, _ = buf.WriteString(index) _, _ = buf.WriteString("\"") } else { needComma = false } if ignoreUnavailble { if needComma { _, _ = buf.WriteString(`,`) } _, _ = buf.WriteString(`"ignore_unavailable": true`) needComma = true } if len(checkpoints) > 0 { if needComma { _, _ = buf.WriteString(`,`) } _, _ = buf.WriteString(` "wait_for_checkpoints": `) // Write array as string, example: [1,2,3] _, _ = buf.WriteString(sqn.SeqNo(checkpoints).JSONString()) } _, _ = buf.WriteString("}\n") return nil } func (b *Bulker) writeMsearchBody(buf *Buf, body []byte) error { _, _ = buf.Write(body) _, _ = buf.WriteRune('\n') return b.validateBody(body) } func (b *Bulker) flushSearch(ctx context.Context, queue queueT) error { start := time.Now() const kRoughEstimatePerItem = 256 bufSz := queue.cnt * kRoughEstimatePerItem if bufSz < queue.pending { bufSz = queue.pending } var buf bytes.Buffer buf.Grow(bufSz) queueCnt := 0 links := []apm.SpanLink{} for n := queue.head; n != nil; n = n.next { buf.Write(n.buf.Bytes()) queueCnt += 1 if n.spanLink != nil { links = append(links, *n.spanLink) } } if len(links) == 0 { links = nil } span, ctx := apm.StartSpanOptions(ctx, "Flush: search", "search", apm.SpanOptions{ Links: links, }) defer span.End() // Do actual bulk request; and send response on chan var ( res *esapi.Response err error ) if queue.ty == kQueueFleetSearch { req := esapi.FleetMsearchRequest{ Body: bytes.NewReader(buf.Bytes()), } res, err = req.Do(ctx, b.es) } else { req := esapi.MsearchRequest{ Body: bytes.NewReader(buf.Bytes()), } res, err = req.Do(ctx, b.es) } if err != nil { return err } if res.Body != nil { defer res.Body.Close() } if res.IsError() { zerolog.Ctx(ctx).Warn().Str("mod", kModBulk).Str("error.message", res.String()).Msg("bulker.flushSearch: Fail writeMsearchBody") return parseError(res, zerolog.Ctx(ctx)) } // Reuse buffer buf.Reset() bodySz, err := buf.ReadFrom(res.Body) if err != nil { zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("MsearchResponse error") return err } // prealloc slice var blk MsearchResponse blk.Responses = make([]MsearchResponseItem, 0, queueCnt) if err = easyjson.Unmarshal(buf.Bytes(), &blk); err != nil { zerolog.Ctx(ctx).Error().Err(err).Str("mod", kModBulk).Msg("Unmarshal error") return err } zerolog.Ctx(ctx).Trace(). Err(err). Str("mod", kModBulk). Dur("rtt", time.Since(start)). Int("took", blk.Took). Int("cnt", len(blk.Responses)). Int("bufSz", bufSz). Int64("bodySz", bodySz). Msg("flushSearch") if len(blk.Responses) != queueCnt { return fmt.Errorf("Bulk queue length mismatch") } // WARNING: Once we start pushing items to // the queue, the node pointers are invalid. // Do NOT return a non-nil value or failQueue // up the stack will fail. n := queue.head for i := range blk.Responses { next := n.next // 'n' is invalid immediately on channel send response := &blk.Responses[i] select { case n.ch <- respT{ err: response.deriveError(), idx: n.idx, data: response, }: default: panic("Unexpected blocked response channel on flushSearch") } n = next } return nil }