in bulk_indexer.go [383:418]
func (b *BulkIndexer) newBulkIndexRequest(ctx context.Context) (*http.Request, error) {
// We should not pass the original b.buf bytes.Buffer down to the client/http layer because
// the indexer will reuse the buffer. The underlying http client/transport implementation may keep
// reading from the buffer after the request is done and the call to `req.Do` has returned.
// This may happen in HTTP error cases when the server isn't required to read the full
// request body before sending a response.
// This can cause undefined behavior (and panics) due to concurrent reads/writes to bytes.Buffer
// internal member variables (b.buf.off, b.buf.lastRead).
// See: https://github.com/golang/go/issues/51907
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/_bulk", bytes.NewReader(b.buf.Bytes()))
if err != nil {
return nil, err
}
req.Header.Add("Content-Type", "application/json")
v := req.URL.Query()
if b.config.Pipeline != "" {
v.Set("pipeline", b.config.Pipeline)
}
if b.config.RequireDataStream {
v.Set("require_data_stream", strconv.FormatBool(b.config.RequireDataStream))
}
v.Set("filter_path", "items.*._index,items.*.status,items.*.failure_store,items.*.error.type,items.*.error.reason")
if b.config.IncludeSourceOnError != Unset {
switch b.config.IncludeSourceOnError {
case False:
v.Set("include_source_on_error", "false")
case True:
v.Set("include_source_on_error", "true")
}
}
req.URL.RawQuery = v.Encode()
return req, nil
}