func()

in bulk_indexer.go [383:418]


func (b *BulkIndexer) newBulkIndexRequest(ctx context.Context) (*http.Request, error) {
	// We should not pass the original b.buf bytes.Buffer down to the client/http layer because
	// the indexer will reuse the buffer. The underlying http client/transport implementation may keep
	// reading from the buffer after the request is done and the call to `req.Do` has returned.
	// This may happen in HTTP error cases when the server isn't required to read the full
	// request body before sending a response.
	// This can cause undefined behavior (and panics) due to concurrent reads/writes to bytes.Buffer
	// internal member variables (b.buf.off, b.buf.lastRead).
	// See: https://github.com/golang/go/issues/51907
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/_bulk", bytes.NewReader(b.buf.Bytes()))
	if err != nil {
		return nil, err
	}

	req.Header.Add("Content-Type", "application/json")
	v := req.URL.Query()
	if b.config.Pipeline != "" {
		v.Set("pipeline", b.config.Pipeline)
	}
	if b.config.RequireDataStream {
		v.Set("require_data_stream", strconv.FormatBool(b.config.RequireDataStream))
	}
	v.Set("filter_path", "items.*._index,items.*.status,items.*.failure_store,items.*.error.type,items.*.error.reason")
	if b.config.IncludeSourceOnError != Unset {
		switch b.config.IncludeSourceOnError {
		case False:
			v.Set("include_source_on_error", "false")
		case True:
			v.Set("include_source_on_error", "true")
		}
	}

	req.URL.RawQuery = v.Encode()

	return req, nil
}