func()

in bulk_indexer.go [421:691]


func (b *BulkIndexer) Flush(ctx context.Context) (BulkIndexerResponseStat, error) {
	if b.itemsAdded == 0 {
		return BulkIndexerResponseStat{}, nil
	}

	if b.gzipw != nil {
		if err := b.gzipw.Close(); err != nil {
			return BulkIndexerResponseStat{}, fmt.Errorf("failed closing the gzip writer: %w", err)
		}
	}

	if b.config.MaxDocumentRetries > 0 || b.config.PopulateFailedDocsInput {
		n := b.buf.Len()
		if cap(b.copyBuf) < n {
			b.copyBuf = slices.Grow(b.copyBuf, n-len(b.copyBuf))
		}
		b.copyBuf = b.copyBuf[:n]
		copy(b.copyBuf, b.buf.Bytes())
	}

	req, err := b.newBulkIndexRequest(ctx)
	if err != nil {
		return BulkIndexerResponseStat{}, fmt.Errorf("failed to create bulk index request: %w", err)
	}

	if b.gzipw != nil {
		req.Header.Set("Content-Encoding", "gzip")
	}

	bytesFlushed := b.buf.Len()
	bytesUncompFlushed := b.writer.bytesWritten
	res, err := b.config.Client.Perform(req)
	if err != nil {
		b.resetBuf()
		return BulkIndexerResponseStat{}, fmt.Errorf("failed to execute the request: %w", err)
	}
	defer res.Body.Close()

	// Reset the buffer and gzip writer so they can be reused in case
	// document level retries are needed.
	b.resetBuf()

	// Record the number of flushed bytes only when err == nil. The body may
	// not have been sent otherwise.
	b.bytesFlushed = bytesFlushed
	b.bytesUncompFlushed = bytesUncompFlushed
	var resp BulkIndexerResponseStat
	if res.StatusCode > 299 {
		var s string
		if b.config.IncludeSourceOnError == Unset {
			var er struct {
				Error struct {
					Type     string `json:"type,omitempty"`
					Reason   string `json:"reason,omitempty"`
					CausedBy struct {
						Type   string `json:"type,omitempty"`
						Reason string `json:"reason,omitempty"`
					} `json:"caused_by,omitempty"`
				} `json:"error,omitempty"`
			}

			if err := jsoniter.NewDecoder(res.Body).Decode(&er); err == nil {
				er.Error.Reason = ""
				er.Error.CausedBy.Reason = ""

				b, _ := json.Marshal(&er)
				s = string(b)
			}
		} else {
			b, err := io.ReadAll(res.Body)
			if err != nil {
				return BulkIndexerResponseStat{}, fmt.Errorf("failed to read response body: %w", err)
			}
			s = string(b)
		}
		e := ErrorFlushFailed{resp: s, statusCode: res.StatusCode}
		switch {
		case res.StatusCode == 429:
			e.tooMany = true
		case res.StatusCode >= 500:
			e.serverError = true
		case res.StatusCode >= 400 && res.StatusCode != 429:
			e.clientError = true
		}
		return resp, e
	}

	if err := jsoniter.NewDecoder(res.Body).Decode(&resp); err != nil {
		return resp, fmt.Errorf("error decoding bulk response: %w", err)
	}

	if b.config.IncludeSourceOnError == Unset {
		for i, doc := range resp.FailedDocs {
			doc.Error.Reason = ""
			resp.FailedDocs[i] = doc
		}
	}

	if b.config.MaxDocumentRetries > 0 || b.config.PopulateFailedDocsInput {
		buf := make([]byte, 0, 4096)

		// Eliminate previous retry counts that aren't present in the bulk
		// request response.
		for k := range b.retryCounts {
			found := false
			for _, res := range resp.FailedDocs {
				if res.Position == k {
					found = true
					break
				}
			}
			if !found {
				// Retried request succeeded, remove from retry counts
				delete(b.retryCounts, k)
			}
		}

		var gr *gzip.Reader
		if b.gzipw != nil {
			gr, err = gzip.NewReader(bytes.NewReader(b.copyBuf))
			if err != nil {
				return resp, fmt.Errorf("failed to decompress request payload: %w", err)
			}
			defer gr.Close()
		}

		lastln := 0
		lastIdx := 0
		// keep track of the previous newlines
		// the buffer is being read lazily
		seen := 0

		// writeItemAtPos writes the 2 lines for document at position `pos` in bulk request to io.Writer `to`
		writeItemAtPos := func(to io.Writer, pos int) error {
			// there are two lines for each document:
			// - action
			// - document
			//
			// Find the document by looking up the newline separators.
			// First the newline (if exists) before the 'action' then the
			// newline at the end of the 'document' line.
			startln := pos * 2
			endln := startln + 2

			if b.gzipw != nil {
				// First loop, read from the gzip reader
				if len(buf) == 0 {
					n, err := gr.Read(buf[:cap(buf)])
					if err != nil && err != io.EOF {
						return fmt.Errorf("failed to read from compressed buffer: %w", err)
					}
					buf = buf[:n]
				}

				// newlines in the current buf
				newlines := bytes.Count(buf, []byte{'\n'})

				// loop until we've seen the start newline
				for seen+newlines < startln {
					seen += newlines
					n, err := gr.Read(buf[:cap(buf)])
					if err != nil && err != io.EOF {
						return fmt.Errorf("failed to read from compressed buffer: %w", err)
					}
					buf = buf[:n]
					newlines = bytes.Count(buf, []byte{'\n'})
				}

				startIdx := indexnth(buf, startln-seen, '\n') + 1
				endIdx := indexnth(buf, endln-seen, '\n') + 1

				// If the end newline is not in the buffer read more data
				if endIdx == 0 {
					// Write what we have
					to.Write(buf[startIdx:])

					// loop until we've seen the end newline
					for seen+newlines < endln {
						seen += newlines
						n, err := gr.Read(buf[:cap(buf)])
						if err != nil && err != io.EOF {
							return fmt.Errorf("failed to read from compressed buffer: %w", err)
						}
						buf = buf[:n]
						newlines = bytes.Count(buf, []byte{'\n'})
						if seen+newlines < endln {
							// endln is not here, write what we have and keep going
							to.Write(buf)
						}
					}

					// try again to find the end newline in the extra data
					// we just read.
					endIdx = indexnth(buf, endln-seen, '\n') + 1
					to.Write(buf[:endIdx])
				} else {
					// If the end newline is in the buffer write the event
					to.Write(buf[startIdx:endIdx])
				}
			} else {
				startIdx := indexnth(b.copyBuf[lastIdx:], startln-lastln, '\n') + 1
				endIdx := indexnth(b.copyBuf[lastIdx:], endln-lastln, '\n') + 1

				to.Write(b.copyBuf[lastIdx:][startIdx:endIdx])

				lastln = endln
				lastIdx += endIdx
			}
			return nil
		}

		// inputBuf is only used to populate failed item input
		var inputBuf bytes.Buffer

		nonRetriable := resp.FailedDocs[:0]
		// appendNonRetriable appends an item deemed non-retriable to the nonRetriable slice.
		// At the same time, it optionally populates item.Input with the action and document lines of the document.
		appendNonRetriable := func(item BulkIndexerResponseItem) (err error) {
			if b.config.PopulateFailedDocsInput {
				inputBuf.Reset()
				// In an ideal world, PopulateFailedDocsInput failures should not cause a flush failure.
				// But since this is only for debugging / testing, and any writeItemAtPos would reveal a bug in the code,
				// fail fast and explicitly.
				err = writeItemAtPos(&inputBuf, item.Position)
				item.Input = inputBuf.String()
			}
			nonRetriable = append(nonRetriable, item)
			return
		}

		for _, item := range resp.FailedDocs {
			if b.config.MaxDocumentRetries > 0 && b.shouldRetryOnStatus(item.Status) {
				// Increment retry count for the positions found.
				count := b.retryCounts[item.Position] + 1
				// check if we are above the maxDocumentRetry setting
				if count > b.config.MaxDocumentRetries {
					// do not retry, return the document as failed
					if err := appendNonRetriable(item); err != nil {
						return resp, err
					}
					continue
				}
				if resp.GreatestRetry < count {
					resp.GreatestRetry = count
				}

				// Since some items may have succeeded, counter positions need
				// to be updated to match the next current buffer position.
				b.retryCounts[b.itemsAdded] = count

				if err := writeItemAtPos(b.writer, item.Position); err != nil {
					return resp, err
				}
				resp.RetriedDocs++
				b.itemsAdded++
			} else {
				// If it's not a retriable error, treat the document as failed
				if err := appendNonRetriable(item); err != nil {
					return resp, err
				}
			}
		}

		// FailedDocs contain responses of
		// - non-retriable errors
		// - retriable errors that reached the retry limit
		resp.FailedDocs = nonRetriable
	}

	return resp, nil
}