functionaltests/internal/esclient/client.go (183 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package esclient import ( "context" "encoding/json" "errors" "fmt" "net/http" "time" "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-elasticsearch/v8/typedapi/core/search" "github.com/elastic/go-elasticsearch/v8/typedapi/esql/query" "github.com/elastic/go-elasticsearch/v8/typedapi/indices/rollover" "github.com/elastic/go-elasticsearch/v8/typedapi/ingest/putpipeline" "github.com/elastic/go-elasticsearch/v8/typedapi/security/createapikey" "github.com/elastic/go-elasticsearch/v8/typedapi/types" ) type Client struct { es *elasticsearch.TypedClient } // New returns a new Client for accessing Elasticsearch. func New(esURL, username, password string) (*Client, error) { transport := http.DefaultTransport.(*http.Transport).Clone() es, err := elasticsearch.NewTypedClient(elasticsearch.Config{ Addresses: []string{esURL}, Username: username, Password: password, Transport: transport, }) if err != nil { return nil, fmt.Errorf("error creating Elasticsearch client: %w", err) } return &Client{es: es}, nil } var elasticsearchTimeUnits = []struct { Duration time.Duration Unit string }{ {time.Hour, "h"}, {time.Minute, "m"}, {time.Second, "s"}, {time.Millisecond, "ms"}, {time.Microsecond, "micros"}, } // formatDurationElasticsearch formats a duration using // Elasticsearch supported time units. // // See https://www.elastic.co/guide/en/elasticsearch/reference/current/api-conventions.html#time-units func formatDurationElasticsearch(d time.Duration) string { for _, tu := range elasticsearchTimeUnits { if d%tu.Duration == 0 { return fmt.Sprintf("%d%s", d/tu.Duration, tu.Unit) } } return fmt.Sprintf("%dnanos", d) } // CreateAPIKey creates an API Key, and returns it in the base64-encoded form // that agents should provide. // // If expiration is less than or equal to zero, then the API Key never expires. func (c *Client) CreateAPIKey(ctx context.Context, name string, expiration time.Duration, roles map[string]types.RoleDescriptor) (string, error) { var maybeExpiration types.Duration if expiration > 0 { maybeExpiration = formatDurationElasticsearch(expiration) } resp, err := c.es.Security.CreateApiKey().Request(&createapikey.Request{ Name: &name, Expiration: maybeExpiration, RoleDescriptors: roles, Metadata: map[string]json.RawMessage{ "creator": []byte(`"apmclient"`), }, }).Do(ctx) if err != nil { return "", fmt.Errorf("error creating API key: %w", err) } return resp.Encoded, nil } // CreateIngestPipeline creates a new pipeline with the provided name and processors. // // Refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/ingest.html. func (c *Client) CreateIngestPipeline(ctx context.Context, pipeline string, processors []types.ProcessorContainer) error { _, err := c.es.Ingest.PutPipeline(pipeline). Request(&putpipeline.Request{Processors: processors}). Do(ctx) if err != nil { return fmt.Errorf("error creating ingest pipeline for %s: %w", pipeline, err) } return nil } // PerformManualRollover performs an immediate manual rollover for the specified data stream. // // Refer to https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-rollover-index.html. func (c *Client) PerformManualRollover(ctx context.Context, dataStream string) error { _, err := c.es.Indices.Rollover(dataStream).Request(&rollover.Request{}).Do(ctx) if err != nil { return fmt.Errorf("error performing manual rollover for %s: %w", dataStream, err) } return nil } func (c *Client) GetDataStream(ctx context.Context, name string) ([]types.DataStream, error) { resp, err := c.es.Indices.GetDataStream().Name(name).Do(ctx) if err != nil { return []types.DataStream{}, fmt.Errorf("cannot GET datastream: %w", err) } return resp.DataStreams, nil } // DocCount is used to unmarshal response from ES|QL query. type DocCount struct { DataStream string Count int } // DataStreamsDocCount is an easy to assert on format reporting document count // for data streams. type DataStreamsDocCount map[string]int // APMDSDocCount retrieves the document count per data stream of all APM data streams. func (c *Client) APMDSDocCount(ctx context.Context) (DataStreamsDocCount, error) { q := `FROM traces-apm*,apm-*,traces-*.otel-*,logs-apm*,apm-*,logs-*.otel-*,metrics-apm*,apm-*,metrics-*.otel-* | WHERE data_stream.type IS NOT NULL | EVAL datastream = CONCAT(data_stream.type, "-", data_stream.dataset, "-", data_stream.namespace) | STATS count = COUNT(*) BY datastream | SORT count DESC` qry := c.es.Esql.Query().Query(q) resp, err := query.Helper[DocCount](ctx, qry) if err != nil { var eserr *types.ElasticsearchError // suppress this error as it only indicates no data is available yet. expected := `Found 1 problem line 1:1: Unknown index [traces-apm*,apm-*,traces-*.otel-*,logs-apm*,apm-*,logs-*.otel-*,metrics-apm*,apm-*,metrics-*.otel-*]` if errors.As(err, &eserr) && eserr.ErrorCause.Reason != nil && *eserr.ErrorCause.Reason == expected { return DataStreamsDocCount{}, nil } return DataStreamsDocCount{}, fmt.Errorf("cannot retrieve APM doc count: %w", err) } res := DataStreamsDocCount{} for _, dc := range resp { res[dc.DataStream] = dc.Count } return res, nil } // GetESErrorLogs retrieves Elasticsearch error logs. // The search query is on the Index used by Elasticsearch monitoring to store logs. // exclude allows to pass in must_not clauses to be applied to the query to filter // the returned results. func (c *Client) GetESErrorLogs(ctx context.Context, exclude ...types.Query) (*search.Response, error) { // There is an issue in ES: https://github.com/elastic/elasticsearch/issues/125445, // that is causing deprecation logger bulk write failures. // The error itself is harmless and irrelevant to APM, so we can ignore it. // TODO: Remove this query once the above issue is fixed. exclude = append(exclude, types.Query{ Match: map[string]types.MatchQuery{ "message": {Query: "Bulk write of deprecation logs encountered some failures"}, }}) size := 100 res, err := c.es.Search(). Index("elastic-cloud-logs-*"). Request(&search.Request{ Size: &size, Query: &types.Query{ Bool: &types.BoolQuery{ Must: []types.Query{ { Match: map[string]types.MatchQuery{ "service.type": {Query: "elasticsearch"}, }, }, { QueryString: &types.QueryStringQuery{ Query: `log.level: ("error" OR "ERROR")`, }, }, }, MustNot: exclude, }, }, }).Do(ctx) if err != nil { return search.NewResponse(), fmt.Errorf("cannot run search query: %w", err) } return res, nil } // GetAPMErrorLogs retrieves Elasticsearch error logs. // The search query is on the Index used by Elasticsearch monitoring to store logs. // exclude allows to pass in must_not clauses to be applied to the query to filter // the returned results. func (c *Client) GetAPMErrorLogs(ctx context.Context, exclude ...types.Query) (*search.Response, error) { size := 100 res, err := c.es.Search(). Index("elastic-cloud-logs-*"). Request(&search.Request{ Size: &size, Query: &types.Query{ Bool: &types.BoolQuery{ Must: []types.Query{ { Match: map[string]types.MatchQuery{ "service.name": {Query: "apm-server"}, }, }, { QueryString: &types.QueryStringQuery{ Query: `log.level: ("error" OR "ERROR")`, }, }, }, MustNot: exclude, }, }, }).Do(ctx) if err != nil { return search.NewResponse(), fmt.Errorf("cannot run search query: %w", err) } return res, nil }