in bulk_indexer.go [421:691]
func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error) {
if b.itemsAdded == 0 {
return BulkIndexerResponseStat{}, nil
}
if b.gzipw != nil {
if err := b.gzipw.Close(); err != nil {
return BulkIndexerResponseStat{}, fmt.Errorf("failed closing the gzip writer: %w", err)
}
}
if b.config.MaxDocumentRetries > 0 || b.config.PopulateFailedDocsInput {
n := b.buf.Len()
if cap(b.copyBuf) < n {
b.copyBuf = slices.Grow(b.copyBuf, n-len(b.copyBuf))
}
b.copyBuf = b.copyBuf[:n]
copy(b.copyBuf, b.buf.Bytes())
}
req, err := b.newBulkIndexRequest(ctx)
if err != nil {
return BulkIndexerResponseStat{}, fmt.Errorf("failed to create bulk index request: %w", err)
}
if b.gzipw != nil {
req.Header.Set("Content-Encoding", "gzip")
}
bytesFlushed := b.buf.Len()
bytesUncompFlushed := b.writer.bytesWritten
res, err := b.config.Client.Perform(req)
if err != nil {
b.resetBuf()
return BulkIndexerResponseStat{}, fmt.Errorf("failed to execute the request: %w", err)
}
defer res.Body.Close()
// Reset the buffer and gzip writer so they can be reused in case
// document level retries are needed.
b.resetBuf()
// Record the number of flushed bytes only when err == nil. The body may
// not have been sent otherwise.
b.bytesFlushed = bytesFlushed
b.bytesUncompFlushed = bytesUncompFlushed
var resp BulkIndexerResponseStat
if res.StatusCode > 299 {
var s string
if b.config.IncludeSourceOnError == Unset {
var er struct {
Error struct {
Type string `json:"type,omitempty"`
Reason string `json:"reason,omitempty"`
CausedBy struct {
Type string `json:"type,omitempty"`
Reason string `json:"reason,omitempty"`
} `json:"caused_by,omitempty"`
} `json:"error,omitempty"`
}
if err := jsoniter.NewDecoder(res.Body).Decode(&er); err == nil {
er.Error.Reason = ""
er.Error.CausedBy.Reason = ""
b, _ := json.Marshal(&er)
s = string(b)
}
} else {
b, err := io.ReadAll(res.Body)
if err != nil {
return BulkIndexerResponseStat{}, fmt.Errorf("failed to read response body: %w", err)
}
s = string(b)
}
e := ErrorFlushFailed{resp: s, statusCode: res.StatusCode}
switch {
case res.StatusCode == 429:
e.tooMany = true
case res.StatusCode >= 500:
e.serverError = true
case res.StatusCode >= 400 && res.StatusCode != 429:
e.clientError = true
}
return resp, e
}
if err := jsoniter.NewDecoder(res.Body).Decode(&resp); err != nil {
return resp, fmt.Errorf("error decoding bulk response: %w", err)
}
if b.config.IncludeSourceOnError == Unset {
for i, doc := range resp.FailedDocs {
doc.Error.Reason = ""
resp.FailedDocs[i] = doc
}
}
if b.config.MaxDocumentRetries > 0 || b.config.PopulateFailedDocsInput {
buf := make([]byte, 0, 4096)
// Eliminate previous retry counts that aren't present in the bulk
// request response.
for k := range b.retryCounts {
found := false
for _, res := range resp.FailedDocs {
if res.Position == k {
found = true
break
}
}
if !found {
// Retried request succeeded, remove from retry counts
delete(b.retryCounts, k)
}
}
var gr *gzip.Reader
if b.gzipw != nil {
gr, err = gzip.NewReader(bytes.NewReader(b.copyBuf))
if err != nil {
return resp, fmt.Errorf("failed to decompress request payload: %w", err)
}
defer gr.Close()
}
lastln := 0
lastIdx := 0
// keep track of the previous newlines
// the buffer is being read lazily
seen := 0
// writeItemAtPos writes the 2 lines for document at position `pos` in bulk request to io.Writer `to`
writeItemAtPos := func(to io.Writer, pos int) error {
// there are two lines for each document:
// - action
// - document
//
// Find the document by looking up the newline separators.
// First the newline (if exists) before the 'action' then the
// newline at the end of the 'document' line.
startln := pos * 2
endln := startln + 2
if b.gzipw != nil {
// First loop, read from the gzip reader
if len(buf) == 0 {
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read from compressed buffer: %w", err)
}
buf = buf[:n]
}
// newlines in the current buf
newlines := bytes.Count(buf, []byte{'\n'})
// loop until we've seen the start newline
for seen+newlines < startln {
seen += newlines
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read from compressed buffer: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
}
startIdx := indexnth(buf, startln-seen, '\n') + 1
endIdx := indexnth(buf, endln-seen, '\n') + 1
// If the end newline is not in the buffer read more data
if endIdx == 0 {
// Write what we have
to.Write(buf[startIdx:])
// loop until we've seen the end newline
for seen+newlines < endln {
seen += newlines
n, err := gr.Read(buf[:cap(buf)])
if err != nil && err != io.EOF {
return fmt.Errorf("failed to read from compressed buffer: %w", err)
}
buf = buf[:n]
newlines = bytes.Count(buf, []byte{'\n'})
if seen+newlines < endln {
// endln is not here, write what we have and keep going
to.Write(buf)
}
}
// try again to find the end newline in the extra data
// we just read.
endIdx = indexnth(buf, endln-seen, '\n') + 1
to.Write(buf[:endIdx])
} else {
// If the end newline is in the buffer write the event
to.Write(buf[startIdx:endIdx])
}
} else {
startIdx := indexnth(b.copyBuf[lastIdx:], startln-lastln, '\n') + 1
endIdx := indexnth(b.copyBuf[lastIdx:], endln-lastln, '\n') + 1
to.Write(b.copyBuf[lastIdx:][startIdx:endIdx])
lastln = endln
lastIdx += endIdx
}
return nil
}
// inputBuf is only used to populate failed item input
var inputBuf bytes.Buffer
nonRetriable := resp.FailedDocs[:0]
// appendNonRetriable appends an item deemed non-retriable to the nonRetriable slice.
// At the same time, it optionally populates item.Input with the action and document lines of the document.
appendNonRetriable := func(item BulkIndexerResponseItem) (err error) {
if b.config.PopulateFailedDocsInput {
inputBuf.Reset()
// In an ideal world, PopulateFailedDocsInput failures should not cause a flush failure.
// But since this is only for debugging / testing, and any writeItemAtPos would reveal a bug in the code,
// fail fast and explicitly.
err = writeItemAtPos(&inputBuf, item.Position)
item.Input = inputBuf.String()
}
nonRetriable = append(nonRetriable, item)
return
}
for _, item := range resp.FailedDocs {
if b.config.MaxDocumentRetries > 0 && b.shouldRetryOnStatus(item.Status) {
// Increment retry count for the positions found.
count := b.retryCounts[item.Position] + 1
// check if we are above the maxDocumentRetry setting
if count > b.config.MaxDocumentRetries {
// do not retry, return the document as failed
if err := appendNonRetriable(item); err != nil {
return resp, err
}
continue
}
if resp.GreatestRetry < count {
resp.GreatestRetry = count
}
// Since some items may have succeeded, counter positions need
// to be updated to match the next current buffer position.
b.retryCounts[b.itemsAdded] = count
if err := writeItemAtPos(b.writer, item.Position); err != nil {
return resp, err
}
resp.RetriedDocs++
b.itemsAdded++
} else {
// If it's not a retriable error, treat the document as failed
if err := appendNonRetriable(item); err != nil {
return resp, err
}
}
}
// FailedDocs contain responses of
// - non-retriable errors
// - retriable errors that reached the retry limit
resp.FailedDocs = nonRetriable
}
return resp, nil
}