docappendertest/docappendertest.go (198 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package docappendertest import ( "bufio" "compress/gzip" "encoding/json" "fmt" "io" "net/http" "net/http/httptest" "net/url" "strings" "sync/atomic" "testing" "github.com/elastic/elastic-transport-go/v8/elastictransport" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.elastic.co/apm/module/apmelasticsearch/v2" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) type BulkRequestItemMeta struct { Action string `json:"-"` Index string `json:"_index"` DocumentID string `json:"_id"` Pipeline string `json:"pipeline"` DynamicTemplates map[string]string `json:"dynamic_templates"` RequireDataStream bool `json:"require_data_stream"` } // TimestampFormat holds the time format for formatting timestamps according to // Elasticsearch's strict_date_optional_time date format, which includes a fractional // seconds component. const TimestampFormat = "2006-01-02T15:04:05.000Z07:00" type BulkIndexerResponse struct { Took int `json:"took"` HasErrors bool `json:"errors"` Items []map[string]BulkIndexerResponseItem `json:"items,omitempty"` } // BulkIndexerResponseItem represents the Elasticsearch response item. type BulkIndexerResponseItem struct { Index string `json:"_index"` DocumentID string `json:"_id"` Version int64 `json:"_version"` Result string `json:"result"` Status int `json:"status"` SeqNo int64 `json:"_seq_no"` PrimTerm int64 `json:"_primary_term"` FailureStore string `json:"failure_store,omitempty"` Shards struct { Total int `json:"total"` Successful int `json:"successful"` Failed int `json:"failed"` } `json:"_shards"` Error struct { Type string `json:"type"` Reason string `json:"reason"` Cause struct { Type string `json:"type"` Reason string `json:"reason"` } `json:"caused_by"` } `json:"error,omitempty"` } // DecodeBulkRequest decodes a /_bulk request's body, returning the decoded documents and a response body. func DecodeBulkRequest(r *http.Request) ([][]byte, BulkIndexerResponse) { indexed, result, _ := DecodeBulkRequestWithStats(r) return indexed, result } // DecodeBulkRequestWithStats decodes a /_bulk request's body, returning the decoded documents // and a response body and stats about request. func DecodeBulkRequestWithStats(r *http.Request) ( docs [][]byte, res BulkIndexerResponse, stats RequestStats) { indexed, result, stats, _ := DecodeBulkRequestWithStatsAndDynamicTemplates(r) return indexed, result, stats } // DecodeBulkRequestWithStatsAndMeta decodes a /_bulk request's body, // returning the decoded bulk request action/meta and documents, // and a response body and stats about the request. func DecodeBulkRequestWithStatsAndMeta(r *http.Request) ( docs [][]byte, meta []BulkRequestItemMeta, res BulkIndexerResponse, stats RequestStats, ) { return decodeBulkRequest(r) } // DecodeBulkRequestWithStatsAndDynamicTemplates decodes a /_bulk request's body, // returning the decoded documents and a response body and stats about request, and per-request dynamic templates. func DecodeBulkRequestWithStatsAndDynamicTemplates(r *http.Request) ( docs [][]byte, res BulkIndexerResponse, stats RequestStats, dynamicTemplates []map[string]string) { indexed, result, stats, dynamicTemplates, _ := DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r) return indexed, result, stats, dynamicTemplates } // DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines decodes a /_bulk request's body, // returning the decoded documents and a response body and stats about request, per-request dynamic templates and pipelines specified in the event. func DecodeBulkRequestWithStatsAndDynamicTemplatesAndPipelines(r *http.Request) ( docs [][]byte, res BulkIndexerResponse, stats RequestStats, dynamicTemplates []map[string]string, pipelines []string, ) { docs, meta, res, stats := decodeBulkRequest(r) for _, meta := range meta { dynamicTemplates = append(dynamicTemplates, meta.DynamicTemplates) pipelines = append(pipelines, meta.Pipeline) } return docs, res, stats, dynamicTemplates, pipelines } func decodeBulkRequest(r *http.Request) ( docs [][]byte, meta []BulkRequestItemMeta, result BulkIndexerResponse, stats RequestStats, ) { body := r.Body switch r.Header.Get("Content-Encoding") { case "gzip": r, err := gzip.NewReader(body) if err != nil { panic(err) } defer r.Close() body = r } cr := &countReader{ ReadCloser: body, } body = cr defer cr.Close() scanner := bufio.NewScanner(body) for scanner.Scan() { action := make(map[string]BulkRequestItemMeta) if err := json.NewDecoder(strings.NewReader(scanner.Text())).Decode(&action); err != nil { panic(err) } var actionType string for actionType = range action { } if !scanner.Scan() { panic("expected source") } doc := append([]byte{}, scanner.Bytes()...) if !json.Valid(doc) { panic(fmt.Errorf("invalid JSON: %s", doc)) } docs = append(docs, doc) item := BulkIndexerResponseItem{Status: http.StatusCreated, Index: action[actionType].Index} result.Items = append(result.Items, map[string]BulkIndexerResponseItem{actionType: item}) itemMeta := action[actionType] itemMeta.Action = actionType meta = append(meta, itemMeta) } return docs, meta, result, RequestStats{int64(cr.bytesRead)} } // NewMockElasticsearchClient returns an elasticsearch.Client which sends /_bulk requests to bulkHandler. func NewMockElasticsearchClient(t testing.TB, bulkHandler http.HandlerFunc) *elastictransport.Client { config := NewMockElasticsearchClientConfig(t, bulkHandler) client, err := elastictransport.New(config) require.NoError(t, err) return client } // NewMockElasticsearchClientConfig starts an httptest.Server, and returns an elasticsearch.Config which // sends /_bulk requests to bulkHandler. The httptest.Server will be closed via t.Cleanup. func NewMockElasticsearchClientConfig(t testing.TB, bulkHandler http.HandlerFunc) elastictransport.Config { mux := http.NewServeMux() HandleBulk(mux, bulkHandler) srv := httptest.NewServer(mux) t.Cleanup(srv.Close) u, err := url.Parse(srv.URL) require.NoError(t, err) config := elastictransport.Config{} config.URLs = []*url.URL{u} config.DisableRetry = true config.Transport = apmelasticsearch.WrapRoundTripper(http.DefaultTransport) return config } // HandleBulk registers bulkHandler with mux for handling /_bulk requests, // wrapping bulkHandler to conform with go-elasticsearch version checking. func HandleBulk(mux *http.ServeMux, bulkHandler http.HandlerFunc) { mux.HandleFunc("/_bulk", func(w http.ResponseWriter, r *http.Request) { w.Header().Set("X-Elastic-Product", "Elasticsearch") bulkHandler.ServeHTTP(w, r) }) } // NewAssertCounter returns a function that canbe used to assert counter metrics func NewAssertCounter(t testing.TB, asserted *atomic.Int64) func(metric metricdata.Metrics, count int64, attrs attribute.Set) { t.Helper() return func(metric metricdata.Metrics, count int64, attrs attribute.Set) { asserted.Add(1) counter := metric.Data.(metricdata.Sum[int64]) for _, dp := range counter.DataPoints { assert.Equal(t, count, dp.Value) assert.Equal(t, attrs, dp.Attributes) } } } // AssertOTelMetrics asserts OTel metrics using a closure. func AssertOTelMetrics(t testing.TB, ms []metricdata.Metrics, assert func(m metricdata.Metrics)) { t.Helper() for _, m := range ms { assert(m) } } // helper reader to keep track of the bytes read by ReadCloser type countReader struct { bytesRead int io.ReadCloser } // Read implements the [io.Reader] interface. func (c *countReader) Read(p []byte) (int, error) { n, err := c.ReadCloser.Read(p) c.bytesRead += n return n, err } type RequestStats struct { UncompressedBytes int64 }