bulk_indexer.go (538 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package docappender import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "slices" "strconv" "unsafe" jsoniter "github.com/json-iterator/go" "github.com/klauspost/compress/gzip" "go.elastic.co/fastjson" "github.com/elastic/elastic-transport-go/v8/elastictransport" ) // At the time of writing, the go-elasticsearch BulkIndexer implementation // sends all items to a channel, and multiple persistent worker goroutines will // receive those items and independently fill up their own buffers. Each one // will independently flush when their buffer is filled up, or when the flush // interval elapses. If there are many workers, then this may lead to sparse // bulk requests. // // We take a different approach, where we fill up one bulk request at a time. // When the buffer is filled up, or the flush interval elapses, we start a new // goroutine to send the request in the background, with a limit on the number // of concurrent bulk requests. This way we can ensure bulk requests have the // maximum possible size, based on configuration and throughput. const ( // Actions are all the actions that can be used when indexing data. // `create` will be used by default. ActionCreate = "create" ActionDelete = "delete" ActionIndex = "index" ActionUpdate = "update" ) // BulkIndexer issues bulk requests to Elasticsearch. It is NOT safe for concurrent use // by multiple goroutines. type BulkIndexer struct { config BulkIndexerConfig itemsAdded int bytesFlushed int bytesUncompFlushed int jsonw fastjson.Writer writer *countWriter gzipw *gzip.Writer copyBuf []byte buf bytes.Buffer retryCounts map[int]int requireDataStream bool } type BulkIndexerResponseStat struct { // Indexed contains the total number of successfully indexed documents. Indexed int64 // RetriedDocs contains the total number of retried documents. RetriedDocs int64 // FailureStoreDocs contains failure store specific document stats. FailureStoreDocs struct { // Used contains the total number of documents indexed to failure store. Used int64 // Failed contains the total number of documents which failed when indexed to failure store. Failed int64 // NotEnabled contains the total number of documents which could have been indexed to failure store // if it was enabled. NotEnabled int64 } // GreatestRetry contains the greatest observed retry count in the entire // bulk request. GreatestRetry int // FailedDocs contains the failed documents. FailedDocs []BulkIndexerResponseItem } // BulkIndexerResponseItem represents the Elasticsearch response item. type BulkIndexerResponseItem struct { Index string `json:"_index"` Status int `json:"status"` Position int Error struct { Type string `json:"type"` Reason string `json:"reason"` } `json:"error,omitempty"` Input string `json:"-"` } // FailureStoreStatus defines enumeration type for all known failure store statuses. type FailureStoreStatus string const ( // FailureStoreStatusUnknown implicit status which represents that there is no information about // this response or that the failure store is not applicable. FailureStoreStatusUnknown FailureStoreStatus = "not_applicable_or_unknown" // FailureStoreStatusUsed status which represents that this document was stored in the failure store successfully. FailureStoreStatusUsed FailureStoreStatus = "used" // FailureStoreStatusFailed status which represents that this document was rejected from the failure store. FailureStoreStatusFailed FailureStoreStatus = "failed" // FailureStoreStatusNotEnabled status which represents that this document was rejected, but // it could have ended up in the failure store if it was enabled. FailureStoreStatusNotEnabled FailureStoreStatus = "not_enabled" ) func init() { jsoniter.RegisterTypeDecoderFunc("docappender.BulkIndexerResponseStat", func(ptr unsafe.Pointer, iter *jsoniter.Iterator) { iter.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool { switch s { case "items": var idx int iter.ReadArrayCB(func(i *jsoniter.Iterator) bool { return i.ReadMapCB(func(i *jsoniter.Iterator, s string) bool { var item BulkIndexerResponseItem i.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool { switch s { case "_index": item.Index = i.ReadString() case "status": item.Status = i.ReadInt() case "failure_store": // For the stats track only actionable explicit failure store statuses "used", "failed" and "not_enabled". switch fs := i.ReadString(); FailureStoreStatus(fs) { case FailureStoreStatusUsed: (*((*BulkIndexerResponseStat)(ptr))).FailureStoreDocs.Used++ case FailureStoreStatusFailed: (*((*BulkIndexerResponseStat)(ptr))).FailureStoreDocs.Failed++ case FailureStoreStatusNotEnabled: (*((*BulkIndexerResponseStat)(ptr))).FailureStoreDocs.NotEnabled++ } case "error": i.ReadObjectCB(func(i *jsoniter.Iterator, s string) bool { switch s { case "type": item.Error.Type = i.ReadString() case "reason": item.Error.Reason = i.ReadString() default: i.Skip() } return true }) default: i.Skip() } return true }) item.Position = idx idx++ if item.Error.Type != "" || item.Status > 201 { (*((*BulkIndexerResponseStat)(ptr))).FailedDocs = append((*((*BulkIndexerResponseStat)(ptr))).FailedDocs, item) } else { (*((*BulkIndexerResponseStat)(ptr))).Indexed = (*((*BulkIndexerResponseStat)(ptr))).Indexed + 1 } return true }) }) // no need to proceed further, return early return false default: i.Skip() return true } }) }) } type countWriter struct { bytesWritten int io.Writer } func (cw *countWriter) Write(p []byte) (int, error) { cw.bytesWritten += len(p) return cw.Writer.Write(p) } // NewBulkIndexer returns a bulk indexer that issues bulk requests to Elasticsearch. // It is only tested with v8 go-elasticsearch client. Use other clients at your own risk. // The returned BulkIndexer is NOT safe for concurrent use by multiple goroutines. func NewBulkIndexer(cfg BulkIndexerConfig) (*BulkIndexer, error) { if err := cfg.Validate(); err != nil { return nil, err } return newBulkIndexer(cfg), nil } func newBulkIndexer(cfg BulkIndexerConfig) *BulkIndexer { b := &BulkIndexer{ config: cfg, retryCounts: make(map[int]int), requireDataStream: cfg.RequireDataStream, } // use a len check instead of a nil check because document level retries // should be disabled using MaxDocumentRetries instead. if len(b.config.RetryOnDocumentStatus) == 0 { b.config.RetryOnDocumentStatus = []int{http.StatusTooManyRequests} } var writer io.Writer if cfg.CompressionLevel != gzip.NoCompression { b.gzipw, _ = gzip.NewWriterLevel(&b.buf, cfg.CompressionLevel) writer = b.gzipw } else { writer = &b.buf } b.writer = &countWriter{0, writer} return b } // Reset resets bulk indexer, ready for a new request. func (b *BulkIndexer) Reset() { b.bytesFlushed = 0 b.bytesUncompFlushed = 0 } // SetClient resets the client used by the bulk indexer. func (b *BulkIndexer) SetClient(client elastictransport.Interface) { b.config.Client = client } // resetBuf resets compressed buffer after flushing it to Elasticsearch func (b *BulkIndexer) resetBuf() { b.itemsAdded = 0 b.writer.bytesWritten = 0 b.buf.Reset() if b.gzipw != nil { b.gzipw.Reset(&b.buf) } } // Items returns the number of buffered items. func (b *BulkIndexer) Items() int { return b.itemsAdded } // Len returns the number of buffered bytes. func (b *BulkIndexer) Len() int { return b.buf.Len() } // UncompressedLen returns the number of uncompressed buffered bytes. func (b *BulkIndexer) UncompressedLen() int { return b.writer.bytesWritten } // BytesFlushed returns the number of bytes flushed by the bulk indexer. func (b *BulkIndexer) BytesFlushed() int { return b.bytesFlushed } // BytesUncompressedFlushed returns the number of uncompressed bytes flushed by the bulk indexer. func (b *BulkIndexer) BytesUncompressedFlushed() int { return b.bytesUncompFlushed } type BulkIndexerItem struct { Index string DocumentID string Pipeline string Action string Body io.WriterTo DynamicTemplates map[string]string RequireDataStream bool } // Add encodes an item in the buffer. func (b *BulkIndexer) Add(item BulkIndexerItem) error { action := item.Action if action == "" { action = ActionCreate } switch action { case ActionCreate, ActionDelete, ActionIndex, ActionUpdate: default: return fmt.Errorf("%s is not a valid action", action) } b.writeMeta( item.Index, item.DocumentID, item.Pipeline, action, item.DynamicTemplates, item.RequireDataStream, ) if _, err := item.Body.WriteTo(b.writer); err != nil { return fmt.Errorf("failed to write bulk indexer item: %w", err) } if _, err := b.writer.Write([]byte("\n")); err != nil { return fmt.Errorf("failed to write newline: %w", err) } b.itemsAdded++ return nil } func (b *BulkIndexer) writeMeta( index, documentID, pipeline, action string, dynamicTemplates map[string]string, requireDataStream bool, ) { b.jsonw.RawString(`{"`) b.jsonw.RawString(action) b.jsonw.RawString(`":{`) first := true if documentID != "" { b.jsonw.RawString(`"_id":`) b.jsonw.String(documentID) first = false } if index != "" { if !first { b.jsonw.RawByte(',') } b.jsonw.RawString(`"_index":`) b.jsonw.String(index) first = false } if pipeline != "" { if !first { b.jsonw.RawByte(',') } b.jsonw.RawString(`"pipeline":`) b.jsonw.String(pipeline) first = false } if len(dynamicTemplates) > 0 { if !first { b.jsonw.RawByte(',') } b.jsonw.RawString(`"dynamic_templates":{`) firstDynamicTemplate := true for k, v := range dynamicTemplates { if !firstDynamicTemplate { b.jsonw.RawByte(',') } b.jsonw.String(k) b.jsonw.RawByte(':') b.jsonw.String(v) firstDynamicTemplate = false } b.jsonw.RawByte('}') first = false } if requireDataStream { if !first { b.jsonw.RawByte(',') } b.jsonw.RawString(`"require_data_stream":true`) first = false } b.jsonw.RawString("}}\n") b.writer.Write(b.jsonw.Bytes()) b.jsonw.Reset() } func (b *BulkIndexer) newBulkIndexRequest(ctx context.Context) (*http.Request, error) { // We should not pass the original b.buf bytes.Buffer down to the client/http layer because // the indexer will reuse the buffer. The underlying http client/transport implementation may keep // reading from the buffer after the request is done and the call to `req.Do` has returned. // This may happen in HTTP error cases when the server isn't required to read the full // request body before sending a response. // This can cause undefined behavior (and panics) due to concurrent reads/writes to bytes.Buffer // internal member variables (b.buf.off, b.buf.lastRead). // See: https://github.com/golang/go/issues/51907 req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/_bulk", bytes.NewReader(b.buf.Bytes())) if err != nil { return nil, err } req.Header.Add("Content-Type", "application/json") v := req.URL.Query() if b.config.Pipeline != "" { v.Set("pipeline", b.config.Pipeline) } if b.config.RequireDataStream { v.Set("require_data_stream", strconv.FormatBool(b.config.RequireDataStream)) } v.Set("filter_path", "items.*._index,items.*.status,items.*.failure_store,items.*.error.type,items.*.error.reason") if b.config.IncludeSourceOnError != Unset { switch b.config.IncludeSourceOnError { case False: v.Set("include_source_on_error", "false") case True: v.Set("include_source_on_error", "true") } } req.URL.RawQuery = v.Encode() return req, nil } // Flush executes a bulk request if there are any items buffered, and clears out the buffer. func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error) { if b.itemsAdded == 0 { return BulkIndexerResponseStat{}, nil } if b.gzipw != nil { if err := b.gzipw.Close(); err != nil { return BulkIndexerResponseStat{}, fmt.Errorf("failed closing the gzip writer: %w", err) } } if b.config.MaxDocumentRetries > 0 || b.config.PopulateFailedDocsInput { n := b.buf.Len() if cap(b.copyBuf) < n { b.copyBuf = slices.Grow(b.copyBuf, n-len(b.copyBuf)) } b.copyBuf = b.copyBuf[:n] copy(b.copyBuf, b.buf.Bytes()) } req, err := b.newBulkIndexRequest(ctx) if err != nil { return BulkIndexerResponseStat{}, fmt.Errorf("failed to create bulk index request: %w", err) } if b.gzipw != nil { req.Header.Set("Content-Encoding", "gzip") } bytesFlushed := b.buf.Len() bytesUncompFlushed := b.writer.bytesWritten res, err := b.config.Client.Perform(req) if err != nil { b.resetBuf() return BulkIndexerResponseStat{}, fmt.Errorf("failed to execute the request: %w", err) } defer res.Body.Close() // Reset the buffer and gzip writer so they can be reused in case // document level retries are needed. b.resetBuf() // Record the number of flushed bytes only when err == nil. The body may // not have been sent otherwise. b.bytesFlushed = bytesFlushed b.bytesUncompFlushed = bytesUncompFlushed var resp BulkIndexerResponseStat if res.StatusCode > 299 { var s string if b.config.IncludeSourceOnError == Unset { var er struct { Error struct { Type string `json:"type,omitempty"` Reason string `json:"reason,omitempty"` CausedBy struct { Type string `json:"type,omitempty"` Reason string `json:"reason,omitempty"` } `json:"caused_by,omitempty"` } `json:"error,omitempty"` } if err := jsoniter.NewDecoder(res.Body).Decode(&er); err == nil { er.Error.Reason = "" er.Error.CausedBy.Reason = "" b, _ := json.Marshal(&er) s = string(b) } } else { b, err := io.ReadAll(res.Body) if err != nil { return BulkIndexerResponseStat{}, fmt.Errorf("failed to read response body: %w", err) } s = string(b) } e := ErrorFlushFailed{resp: s, statusCode: res.StatusCode} switch { case res.StatusCode == 429: e.tooMany = true case res.StatusCode >= 500: e.serverError = true case res.StatusCode >= 400 && res.StatusCode != 429: e.clientError = true } return resp, e } if err := jsoniter.NewDecoder(res.Body).Decode(&resp); err != nil { return resp, fmt.Errorf("error decoding bulk response: %w", err) } if b.config.IncludeSourceOnError == Unset { for i, doc := range resp.FailedDocs { doc.Error.Reason = "" resp.FailedDocs[i] = doc } } if b.config.MaxDocumentRetries > 0 || b.config.PopulateFailedDocsInput { buf := make([]byte, 0, 4096) // Eliminate previous retry counts that aren't present in the bulk // request response. for k := range b.retryCounts { found := false for _, res := range resp.FailedDocs { if res.Position == k { found = true break } } if !found { // Retried request succeeded, remove from retry counts delete(b.retryCounts, k) } } var gr *gzip.Reader if b.gzipw != nil { gr, err = gzip.NewReader(bytes.NewReader(b.copyBuf)) if err != nil { return resp, fmt.Errorf("failed to decompress request payload: %w", err) } defer gr.Close() } lastln := 0 lastIdx := 0 // keep track of the previous newlines // the buffer is being read lazily seen := 0 // writeItemAtPos writes the 2 lines for document at position `pos` in bulk request to io.Writer `to` writeItemAtPos := func(to io.Writer, pos int) error { // there are two lines for each document: // - action // - document // // Find the document by looking up the newline separators. // First the newline (if exists) before the 'action' then the // newline at the end of the 'document' line. startln := pos * 2 endln := startln + 2 if b.gzipw != nil { // First loop, read from the gzip reader if len(buf) == 0 { n, err := gr.Read(buf[:cap(buf)]) if err != nil && err != io.EOF { return fmt.Errorf("failed to read from compressed buffer: %w", err) } buf = buf[:n] } // newlines in the current buf newlines := bytes.Count(buf, []byte{'\n'}) // loop until we've seen the start newline for seen+newlines < startln { seen += newlines n, err := gr.Read(buf[:cap(buf)]) if err != nil && err != io.EOF { return fmt.Errorf("failed to read from compressed buffer: %w", err) } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) } startIdx := indexnth(buf, startln-seen, '\n') + 1 endIdx := indexnth(buf, endln-seen, '\n') + 1 // If the end newline is not in the buffer read more data if endIdx == 0 { // Write what we have to.Write(buf[startIdx:]) // loop until we've seen the end newline for seen+newlines < endln { seen += newlines n, err := gr.Read(buf[:cap(buf)]) if err != nil && err != io.EOF { return fmt.Errorf("failed to read from compressed buffer: %w", err) } buf = buf[:n] newlines = bytes.Count(buf, []byte{'\n'}) if seen+newlines < endln { // endln is not here, write what we have and keep going to.Write(buf) } } // try again to find the end newline in the extra data // we just read. endIdx = indexnth(buf, endln-seen, '\n') + 1 to.Write(buf[:endIdx]) } else { // If the end newline is in the buffer write the event to.Write(buf[startIdx:endIdx]) } } else { startIdx := indexnth(b.copyBuf[lastIdx:], startln-lastln, '\n') + 1 endIdx := indexnth(b.copyBuf[lastIdx:], endln-lastln, '\n') + 1 to.Write(b.copyBuf[lastIdx:][startIdx:endIdx]) lastln = endln lastIdx += endIdx } return nil } // inputBuf is only used to populate failed item input var inputBuf bytes.Buffer nonRetriable := resp.FailedDocs[:0] // appendNonRetriable appends an item deemed non-retriable to the nonRetriable slice. // At the same time, it optionally populates item.Input with the action and document lines of the document. appendNonRetriable := func(item BulkIndexerResponseItem) (err error) { if b.config.PopulateFailedDocsInput { inputBuf.Reset() // In an ideal world, PopulateFailedDocsInput failures should not cause a flush failure. // But since this is only for debugging / testing, and any writeItemAtPos would reveal a bug in the code, // fail fast and explicitly. err = writeItemAtPos(&inputBuf, item.Position) item.Input = inputBuf.String() } nonRetriable = append(nonRetriable, item) return } for _, item := range resp.FailedDocs { if b.config.MaxDocumentRetries > 0 && b.shouldRetryOnStatus(item.Status) { // Increment retry count for the positions found. count := b.retryCounts[item.Position] + 1 // check if we are above the maxDocumentRetry setting if count > b.config.MaxDocumentRetries { // do not retry, return the document as failed if err := appendNonRetriable(item); err != nil { return resp, err } continue } if resp.GreatestRetry < count { resp.GreatestRetry = count } // Since some items may have succeeded, counter positions need // to be updated to match the next current buffer position. b.retryCounts[b.itemsAdded] = count if err := writeItemAtPos(b.writer, item.Position); err != nil { return resp, err } resp.RetriedDocs++ b.itemsAdded++ } else { // If it's not a retriable error, treat the document as failed if err := appendNonRetriable(item); err != nil { return resp, err } } } // FailedDocs contain responses of // - non-retriable errors // - retriable errors that reached the retry limit resp.FailedDocs = nonRetriable } return resp, nil } func (b *BulkIndexer) shouldRetryOnStatus(docStatus int) bool { for _, status := range b.config.RetryOnDocumentStatus { if docStatus == status { return true } } return false } // indexnth returns the index of the nth instance of sep in s. // It returns -1 if sep is not present in s or nth is 0. func indexnth(s []byte, nth int, sep rune) int { if nth == 0 { return -1 } count := 0 return bytes.IndexFunc(s, func(r rune) bool { if r == sep { count++ } return nth == count }) } type ErrorFlushFailed struct { resp string statusCode int tooMany bool clientError bool serverError bool } func (e ErrorFlushFailed) StatusCode() int { return e.statusCode } func (e ErrorFlushFailed) ResponseBody() string { return e.resp } func (e ErrorFlushFailed) Error() string { return fmt.Sprintf("flush failed (%d): %s", e.statusCode, e.resp) }