testing/estools/elasticsearch.go (585 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package estools import ( "bytes" "context" "encoding/json" "fmt" "io" "strconv" "strings" "github.com/gofrs/uuid/v5" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-transport-go/v8/elastictransport" "github.com/elastic/go-elasticsearch/v8/esapi" ) // Index is the data associated with a single index from _cat/indicies type Index struct { Health string `json:"health"` Status string `json:"status"` Index string `json:"index"` UUID string `json:"uuid"` Primary CatIntData `json:"pri"` Replicated CatIntData `json:"rep"` DocsCount CatIntData `json:"docs.count"` DocsDeleted CatIntData `json:"docs.deleted"` StoreSizeBytes CatIntData `json:"store.size"` PrimaryStoreSizeBytes CatIntData `json:"pri.store.size"` } // CatIntData represents a shard/doc/byte count in Index{} type CatIntData int64 // UnmarshalJSON implements the custom unmarshal JSON interface // kind of dumb, but ES wraps ints in quotes, so we have to manually turn them into ints func (s *CatIntData) UnmarshalJSON(b []byte) error { cleaned := strings.Trim(string(b), "\"") res, err := strconv.ParseInt(cleaned, 10, 64) if err != nil { return fmt.Errorf("error unmarshalling JSON for string '%s': %w", cleaned, err) } *s = CatIntData(res) return nil } // Documents represents the complete response from an ES query type Documents struct { Took int `json:"took"` TimedOut bool `json:"timed_out"` Hits Hits `json:"hits"` } // Hits returns the matching documents from an ES query type Hits struct { Hits []ESDoc `json:"hits"` Total TotalDocCount `json:"total"` } // TotalDocCount contains metadata for the ES response type TotalDocCount struct { Value int `json:"value"` Relation string `json:"relation"` } // ESDoc contains the documents returned by an ES query type ESDoc struct { Index string `json:"_index"` Score float64 `json:"_score"` Source map[string]interface{} `json:"_source"` } // TemplateResponse is the body of a template data request type TemplateResponse struct { IndexTemplates []Template `json:"index_templates"` } // Template is an individual template type Template struct { Name string `json:"name"` IndexTemplate map[string]interface{} `json:"index_template"` } // Pipeline is an individual pipeline type Pipeline struct { Description string `json:"description"` Processors []map[string]interface{} `json:"processors"` } // Ping returns basic ES info type Ping struct { Name string `json:"name"` ClusterName string `json:"cluster_name"` ClusterUUID string `json:"cluster_uuid"` Version Version `json:"version"` } // Version contains version and build info from an ES ping type Version struct { Number string `json:"number"` BuildFlavor string `json:"build_flavor"` } // APIKeyRequest contains the needed data to create an API key in Elasticsearch type APIKeyRequest struct { Name string `json:"name"` Expiration string `json:"expiration"` RoleDescriptors mapstr.M `json:"role_descriptors,omitempty"` Metadata mapstr.M `json:"metadata,omitempty"` } // APIKeyResponse contains the response data for an API request type APIKeyResponse struct { ID string `json:"id"` Name string `json:"name"` Expiration int `json:"expiration"` APIKey string `json:"api_key"` Encoded string `json:"encoded"` } // DataStreams represents the response from an ES _data_stream API type DataStreams struct { DataStreams []DataStream `json:"data_streams"` } // DataStream represents a data stream template type DataStream struct { Name string `json:"name"` Indicies []map[string]string `json:"indicies"` Status string `json:"status"` Template string `json:"template"` Lifecycle Lifecycle `json:"lifecycle"` Hidden bool `json:"hidden"` System bool `json:"system"` } type Lifecycle struct { Enabled bool `json:"enabled"` DataRetention string `json:"data_retention"` } // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(ctx context.Context, client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(ctx, client, []string{}) } // GetIndicesWithContext returns a list of indicies on the target ES instance with the provided context func GetIndicesWithContext(ctx context.Context, client elastictransport.Interface, indicies []string) ([]Index, error) { req := esapi.CatIndicesRequest{Format: "json", Bytes: "b"} if len(indicies) > 0 { req.Index = indicies req.ExpandWildcards = "all" } resp, err := req.Do(ctx, client) if err != nil { return nil, fmt.Errorf("error performing cat query: %w", err) } if resp.StatusCode >= 300 || resp.StatusCode < 200 { return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", resp.StatusCode, resp.String()) } buf, err := io.ReadAll(resp.Body) if err != nil { return nil, fmt.Errorf("error reading response: %w", err) } respData := []Index{} err = json.Unmarshal(buf, &respData) if err != nil { return nil, fmt.Errorf("error unmarshaling response: %w", err) } return respData, nil } // CreateAPIKey creates an API key with the given request data func CreateAPIKey(ctx context.Context, client elastictransport.Interface, req APIKeyRequest) (APIKeyResponse, error) { var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(req) if err != nil { return APIKeyResponse{}, fmt.Errorf("error creating ES query: %w", err) } apiReq := esapi.SecurityCreateAPIKeyRequest{Body: &buf} resp, err := apiReq.Do(ctx, client) if err != nil { return APIKeyResponse{}, fmt.Errorf("error creating API key: %w", err) } defer resp.Body.Close() resultBuf, err := handleResponseRaw(resp) if err != nil { return APIKeyResponse{}, fmt.Errorf("error handling HTTP response: %w", err) } parsed := APIKeyResponse{} err = json.Unmarshal(resultBuf, &parsed) if err != nil { return parsed, fmt.Errorf("error unmarshaling json response: %w", err) } return parsed, nil } func CreateServiceToken(ctx context.Context, client elastictransport.Interface, service string) (string, error) { req := esapi.SecurityCreateServiceTokenRequest{ Namespace: "elastic", Service: service, Name: uuid.Must(uuid.NewV4()).String(), // FIXME(michel-laterman): We need to specify a random name until an upstream issue is fixed: https://github.com/elastic/go-elasticsearch/issues/861 } resp, err := req.Do(ctx, client) if err != nil { return "", fmt.Errorf("error creating service token: %w", err) } defer resp.Body.Close() resultBuf, err := handleResponseRaw(resp) if err != nil { return "", fmt.Errorf("error handling HTTP response: %w", err) } var parsed struct { Token struct { Value string `json:"value"` } `json:"token"` } err = json.Unmarshal(resultBuf, &parsed) if err != nil { return "", fmt.Errorf("error unmarshaling json response: %w", err) } return parsed.Token.Value, nil } // FindMatchingLogLines returns any logs with message fields that match the given line func FindMatchingLogLines(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { return FindMatchingLogLinesWithContext(ctx, client, namespace, line) } // GetLatestDocumentMatchingQuery returns the last document that matches the given query. // the query field is inserted into a simple `query` POST request func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport.Interface, query map[string]interface{}, indexPattern string) (Documents, error) { queryRaw := map[string]interface{}{ "query": query, "sort": map[string]interface{}{ "@timestamp": "desc", }, "size": 1, } var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } return PerformQueryForRawQuery(ctx, queryRaw, indexPattern, client) } // GetIndexTemplatesForPattern lists all index templates on the system func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.Interface, name string) (TemplateResponse, error) { req := esapi.IndicesGetIndexTemplateRequest{Name: name} resp, err := req.Do(ctx, client) if err != nil { return TemplateResponse{}, fmt.Errorf("error fetching index templates: %w", err) } defer resp.Body.Close() resultBuf, err := handleResponseRaw(resp) if err != nil { return TemplateResponse{}, fmt.Errorf("error handling HTTP response: %w", err) } parsed := TemplateResponse{} err = json.Unmarshal(resultBuf, &parsed) if err != nil { return TemplateResponse{}, fmt.Errorf("error unmarshaling json response: %w", err) } return parsed, nil } func GetDataStreamsForPattern(ctx context.Context, client elastictransport.Interface, namePattern string) (DataStreams, error) { req := esapi.IndicesGetDataStreamRequest{Name: []string{namePattern}, ExpandWildcards: "all,hidden"} resp, err := req.Do(ctx, client) if err != nil { return DataStreams{}, fmt.Errorf("error fetching data streams") } defer resp.Body.Close() raw, err := handleResponseRaw(resp) if err != nil { return DataStreams{}, fmt.Errorf("error handling HTTP response for data stream get: %w", err) } data := DataStreams{} err = json.Unmarshal(raw, &data) if err != nil { return DataStreams{}, fmt.Errorf("error unmarshalling datastream: %w", err) } return data, nil } // DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates. func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error { req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"} resp, err := req.Do(ctx, client) if err != nil { return fmt.Errorf("error deleting data streams: %w", err) } defer resp.Body.Close() _, err = handleResponseRaw(resp) if err != nil { return fmt.Errorf("error handling HTTP response for data stream delete: %w", err) } patternReq := esapi.IndicesDeleteIndexTemplateRequest{Name: name} resp, err = patternReq.Do(ctx, client) if err != nil { return fmt.Errorf("error deleting index templates: %w", err) } defer resp.Body.Close() _, err = handleResponseRaw(resp) if err != nil { return fmt.Errorf("error handling HTTP response for index template delete: %w", err) } return nil } // GetPipelines returns a list of installed pipelines that match the given name/pattern func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) { req := esapi.IngestGetPipelineRequest{PipelineID: name} resp, err := req.Do(ctx, client) if err != nil { return nil, fmt.Errorf("error fetching index templates: %w", err) } defer resp.Body.Close() resultBuf, err := handleResponseRaw(resp) if err != nil { return nil, fmt.Errorf("error handling HTTP response: %w", err) } parsed := map[string]Pipeline{} err = json.Unmarshal(resultBuf, &parsed) if err != nil { return nil, fmt.Errorf("error unmarshaling json response: %w", err) } return parsed, nil } // DeletePipelines deletes all pipelines that match the given pattern func DeletePipelines(ctx context.Context, client elastictransport.Interface, name string) error { req := esapi.IngestDeletePipelineRequest{PipelineID: name} resp, err := req.Do(ctx, client) if err != nil { return fmt.Errorf("error deleting index template") } defer resp.Body.Close() _, err = handleResponseRaw(resp) if err != nil { return fmt.Errorf("error handling HTTP response: %w", err) } return nil } // FindMatchingLogLinesWithContext returns any logs with message fields that match the given line func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { queryRaw := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match_phrase": map[string]interface{}{ "message": line, }, }, { "term": map[string]interface{}{ "data_stream.namespace": map[string]interface{}{ "value": namespace, }, }, }, }, }, }, } var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) } // CheckForErrorsInLogs checks to see if any error-level lines exist // excludeStrings can be used to remove any particular error strings from logs func CheckForErrorsInLogs(ctx context.Context, client elastictransport.Interface, namespace string, excludeStrings []string) (Documents, error) { return CheckForErrorsInLogsWithContext(ctx, client, namespace, excludeStrings) } // CheckForErrorsInLogsWithContext checks to see if any error-level lines exist // excludeStrings can be used to remove any particular error strings from logs func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictransport.Interface, namespace string, excludeStrings []string) (Documents, error) { filters := map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{ "log.level": "error", }, }, { "term": map[string]interface{}{ "data_stream.namespace": map[string]interface{}{ "value": namespace, }, }, }, }, } if len(excludeStrings) > 0 { excludeStatements := []map[string]interface{}{} for _, ex := range excludeStrings { excludeStatements = append(excludeStatements, map[string]interface{}{ "match_phrase": map[string]interface{}{ "message": ex, }, }) } filters["must_not"] = excludeStatements } queryRaw := map[string]interface{}{ "query": map[string]interface{}{ "bool": filters, }, } var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) } // GetLogsForDataset returns any logs associated with the datastream func GetLogsForDataset(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { return GetLogsForDatasetWithContext(ctx, client, index) } // GetLogsForAgentID returns any logs associated with the agent ID func GetLogsForAgentID(ctx context.Context, client elastictransport.Interface, id string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ "data_stream.dataset": "elastic_agent.*", }, }, } var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(indexQuery) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( es.Search.WithIndex("logs-elastic_agent*"), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), es.Search.WithContext(ctx), es.Search.WithQuery(fmt.Sprintf(`elastic_agent.id:%s`, id)), // magic number, we try to get all entries it helps debugging test failures es.Search.WithSize(300), ) if err != nil { return Documents{}, fmt.Errorf("error performing ES search: %w", err) } return handleDocsResponse(res) } // GetResultsForAgentAndDatastream returns any documents match both the given agent ID and data stream func GetResultsForAgentAndDatastream(ctx context.Context, client elastictransport.Interface, dataset string, agentID string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match": map[string]interface{}{"data_stream.dataset": dataset}, }, { "match": map[string]interface{}{"agent.id": agentID}, }, }, }, }, } var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(indexQuery) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithContext(ctx), es.Search.WithSize(300), ) if err != nil { return Documents{}, fmt.Errorf("error performing ES search: %w", err) } return handleDocsResponse(res) } // GetLogsForDatasetWithContext returns any logs associated with the datastream func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ "data_stream.dataset": index, }, }, } return PerformQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client) } // GetLogsForIndexWithContext returns any logs that match the given condition func GetLogsForIndexWithContext(ctx context.Context, client elastictransport.Interface, index string, match map[string]interface{}) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": match, }, } return PerformQueryForRawQuery(ctx, indexQuery, index, client) } // GetAllLogsForIndexWithContext returns all logs for a given index func GetAllLogsForIndexWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match_all": map[string]interface{}{}, }, } return PerformQueryForRawQuery(ctx, indexQuery, index, client) } // GetPing performs a basic ping and returns ES config info func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, error) { req := esapi.InfoRequest{} resp, err := req.Do(ctx, client) if err != nil { return Ping{}, fmt.Errorf("error in ping request") } defer resp.Body.Close() respData, err := handleResponseRaw(resp) if err != nil { return Ping{}, fmt.Errorf("error in HTTP response: %w", err) } pingData := Ping{} err = json.Unmarshal(respData, &pingData) if err != nil { return pingData, fmt.Errorf("error unmarshalling JSON: %w", err) } return pingData, nil } // PerformQueryForRawQuery executes the ES query specified by queryRaw func PerformQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) { var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( es.Search.WithIndex(index), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), es.Search.WithContext(ctx), es.Search.WithSize(300), ) if err != nil { return Documents{}, fmt.Errorf("error performing ES search: %w", err) } return handleDocsResponse(res) } // FindMatchingLogLinesForAgentWithContext returns the matching `message` line field for an agent with the matching ID func FindMatchingLogLinesForAgentWithContext(ctx context.Context, client elastictransport.Interface, agentID, line string) (Documents, error) { queryRaw := map[string]interface{}{ "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ { "match_phrase": map[string]interface{}{ "message": line, }, }, { "term": map[string]interface{}{ "agent.id": map[string]interface{}{ "value": agentID, }, }, }, }, }, }, } var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } return PerformQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) } // GetLogsForDatastream returns any logs associated with the datastream func GetLogsForDatastream( ctx context.Context, client elastictransport.Interface, dsType, dataset, namespace string) (Documents, error) { query := map[string]any{ "_source": []string{"message"}, "query": map[string]any{ "bool": map[string]any{ "must": []any{ map[string]any{ "match": map[string]any{ "data_stream.dataset": dataset, }, }, map[string]any{ "match": map[string]any{ "data_stream.namespace": namespace, }, }, map[string]any{ "match": map[string]any{ "data_stream.type": dsType, }, }, }, }, }, } var buf bytes.Buffer if err := json.NewEncoder(&buf).Encode(query); err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), es.Search.WithPretty(), es.Search.WithContext(ctx), ) if err != nil { return Documents{}, fmt.Errorf("error performing ES search: %w", err) } return handleDocsResponse(res) } // handleDocsResponse converts the esapi.Response into Documents, // it closes the response.Body after reading func handleDocsResponse(res *esapi.Response) (Documents, error) { resultBuf, err := handleResponseRaw(res) if err != nil { return Documents{}, fmt.Errorf("error in HTTP query: %w", err) } respData := Documents{} err = json.Unmarshal(resultBuf, &respData) if err != nil { return Documents{}, fmt.Errorf("error unmarshaling response: %w", err) } return respData, err } func handleResponseRaw(res *esapi.Response) ([]byte, error) { if res.StatusCode >= 300 || res.StatusCode < 200 { return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) } resultBuf, err := io.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("error reading response body: %w", err) } return resultBuf, nil }