metricbeat/module/elasticsearch/elasticsearch.go (420 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticsearch import ( "encoding/json" "errors" "fmt" "net/url" "strings" "sync" "time" "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/elastic" "github.com/elastic/beats/v7/metricbeat/mb" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/version" ) func init() { // Register the ModuleFactory function for this module. if err := mb.Registry.AddModule(ModuleName, NewModule); err != nil { panic(err) } } // NewModule creates a new module. func NewModule(base mb.BaseModule) (mb.Module, error) { xpackEnabledMetricSets := []string{ "ccr", "enrich", "cluster_stats", "index", "index_recovery", "index_summary", "ml_job", "node_stats", "shard", } optionalXpackMetricsets := []string{"ingest_pipeline"} return elastic.NewModule(&base, xpackEnabledMetricSets, optionalXpackMetricsets, base.Logger.Named(ModuleName)) } var ( // CCRStatsAPIAvailableVersion is the version of Elasticsearch since when the CCR stats API is available. CCRStatsAPIAvailableVersion = version.MustNew("6.5.0") // EnrichStatsAPIAvailableVersion is the version of Elasticsearch since when the Enrich stats API is available. EnrichStatsAPIAvailableVersion = version.MustNew("7.5.0") // BulkStatsAvailableVersion is the version since when bulk indexing stats are available BulkStatsAvailableVersion = version.MustNew("8.0.0") //ExpandWildcardsHiddenAvailableVersion is the version since when the "expand_wildcards" query parameter to // the Indices Stats API can accept "hidden" as a value. ExpandWildcardsHiddenAvailableVersion = version.MustNew("7.7.0") // Global clusterIdCache. Assumption is that the same node id never can belong to a different cluster id. clusterIDCache = map[string]string{} ) // ModuleName is the name of this module. const ModuleName = "elasticsearch" // Info construct contains the data from the Elasticsearch / endpoint type Info struct { ClusterName string `json:"cluster_name"` ClusterID string `json:"cluster_uuid"` Version Version `json:"version"` Name string `json:"name"` } // Version contains the semver formatted version of ES type Version struct { Number *version.V `json:"number"` } // NodeInfo struct cotains data about the node. type NodeInfo struct { Host string `json:"host"` TransportAddress string `json:"transport_address"` IP string `json:"ip"` Name string `json:"name"` ID string } // License contains data about the Elasticsearch license type License struct { Status string `json:"status"` ID string `json:"uid"` Type string `json:"type"` IssueDate *time.Time `json:"issue_date"` IssueDateInMillis int `json:"issue_date_in_millis"` ExpiryDate *time.Time `json:"expiry_date,omitempty"` ExpiryDateInMillis int `json:"expiry_date_in_millis,omitempty"` MaxNodes int `json:"max_nodes,omitempty"` MaxResourceUnits int `json:"max_resource_units,omitempty"` IssuedTo string `json:"issued_to"` Issuer string `json:"issuer"` StartDateInMillis int `json:"start_date_in_millis"` } type licenseWrapper struct { License License `json:"license"` } // GetClusterID fetches cluster id for given nodeID. func GetClusterID(http *helper.HTTP, uri string, nodeID string) (string, error) { // Check if cluster id already cached. If yes, return it. if clusterID, ok := clusterIDCache[nodeID]; ok { return clusterID, nil } info, err := GetInfo(http, uri) if err != nil { return "", err } clusterIDCache[nodeID] = info.ClusterID return info.ClusterID, nil } // isMaster checks if the given node host is a master node. // // The detection of the master is done in two steps: // * Fetch node name from /_nodes/_local/name // * Fetch current master name from cluster state /_cluster/state/master_node // // The two names are compared func isMaster(http *helper.HTTP, uri string) (bool, error) { node, err := getNodeName(http, uri) if err != nil { return false, err } master, err := getMasterName(http, uri) if err != nil { return false, err } return master == node, nil } func getNodeName(http *helper.HTTP, uri string) (string, error) { content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "") if err != nil { return "", err } nodesStruct := struct { Nodes map[string]interface{} `json:"nodes"` }{} err = json.Unmarshal(content, &nodesStruct) if err != nil { return "", err } // _local will only fetch one node info. First entry is node name for k := range nodesStruct.Nodes { return k, nil } return "", fmt.Errorf("no local node found") } func getMasterName(http *helper.HTTP, uri string) (string, error) { content, err := fetchPath(http, uri, "_cluster/state/master_node", "local=true") if err != nil { return "", err } clusterStruct := struct { MasterNode string `json:"master_node"` }{} err = json.Unmarshal(content, &clusterStruct) if err != nil { return "", err } return clusterStruct.MasterNode, nil } // GetInfo returns the data for the Elasticsearch / endpoint. func GetInfo(http *helper.HTTP, uri string) (Info, error) { content, err := fetchPath(http, uri, "/", "") if err != nil { return Info{}, err } info := Info{} err = json.Unmarshal(content, &info) if err != nil { return Info{}, err } return info, nil } func fetchPath(http *helper.HTTP, uri, path string, query string) ([]byte, error) { defer http.SetURI(uri) // Parses the uri to replace the path u, _ := url.Parse(uri) u.Path = path u.RawQuery = query // Http helper includes the HostData with username and password http.SetURI(u.String()) return http.FetchContent() } // GetNodeInfo returns the node information. func GetNodeInfo(http *helper.HTTP, uri string, nodeID string) (*NodeInfo, error) { content, err := fetchPath(http, uri, "/_nodes/_local/nodes", "") if err != nil { return nil, err } nodesStruct := struct { Nodes map[string]*NodeInfo `json:"nodes"` }{} err = json.Unmarshal(content, &nodesStruct) if err != nil { return nil, err } // _local will only fetch one node info. First entry is node name for k, v := range nodesStruct.Nodes { // In case the nodeID is empty, first node info will be returned if k == nodeID || nodeID == "" { v.ID = k return v, nil } } return nil, fmt.Errorf("no node matched id %s", nodeID) } // GetLicense returns license information. Since we don't expect license information // to change frequently, the information is cached for 1 minute to avoid // hitting Elasticsearch frequently. func GetLicense(http *helper.HTTP, resetURI string) (*License, error) { // First, check the cache license := licenseCache.get() // License found in cache, return it if license != nil { return license, nil } // License not found in cache, fetch it from Elasticsearch content, err := fetchPath(http, resetURI, "_license", "") if err != nil { return nil, err } var data licenseWrapper err = json.Unmarshal(content, &data) if err != nil { return nil, err } // Cache license for a minute license = &data.License licenseCache.set(license, time.Minute) return license, nil } // GetClusterState returns cluster state information. func GetClusterState(http *helper.HTTP, resetURI string, metrics []string, filterPaths []string) (mapstr.M, error) { queryParams := []string{"local=true"} clusterStateURI := "_cluster/state" if len(metrics) > 0 { clusterStateURI += "/" + strings.Join(metrics, ",") } if len(filterPaths) > 0 { filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",") queryParams = append(queryParams, filterPathQueryParam) } queryString := strings.Join(queryParams, "&") content, err := fetchPath(http, resetURI, clusterStateURI, queryString) if err != nil { return nil, err } var clusterState map[string]interface{} err = json.Unmarshal(content, &clusterState) return clusterState, err } func GetIndexSettings(http *helper.HTTP, resetURI string, indexPattern string, filterPaths []string) (mapstr.M, error) { queryParams := []string{"local=true", "expand_wildcards=hidden,all"} indicesSettingsURI := indexPattern + "/_settings" if len(filterPaths) > 0 { filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",") queryParams = append(queryParams, filterPathQueryParam) } queryString := strings.Join(queryParams, "&") content, err := fetchPath(http, resetURI, indicesSettingsURI, queryString) if err != nil { return nil, err } var indicesSettings map[string]interface{} err = json.Unmarshal(content, &indicesSettings) return indicesSettings, err } // GetClusterSettingsWithDefaults returns cluster settings. func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (mapstr.M, error) { return GetClusterSettings(http, resetURI, true, filterPaths) } // GetClusterSettings returns cluster settings func GetClusterSettings(http *helper.HTTP, resetURI string, includeDefaults bool, filterPaths []string) (mapstr.M, error) { clusterSettingsURI := "_cluster/settings" var queryParams []string if includeDefaults { queryParams = append(queryParams, "include_defaults=true") } if len(filterPaths) > 0 { filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",") queryParams = append(queryParams, filterPathQueryParam) } queryString := strings.Join(queryParams, "&") content, err := fetchPath(http, resetURI, clusterSettingsURI, queryString) if err != nil { return nil, err } var clusterSettings map[string]interface{} err = json.Unmarshal(content, &clusterSettings) return clusterSettings, err } // GetStackUsage returns stack usage information. func GetStackUsage(http *helper.HTTP, resetURI string) (map[string]interface{}, error) { content, err := fetchPath(http, resetURI, "_xpack/usage", "") if err != nil { return nil, err } var stackUsage map[string]interface{} err = json.Unmarshal(content, &stackUsage) return stackUsage, err } type XPack struct { Features struct { CCR struct { Enabled bool `json:"enabled"` } `json:"CCR"` ML struct { Enabled bool `json:"enabled"` } `json:"ml"` } `json:"features"` } // GetXPack returns information about xpack features. func GetXPack(http *helper.HTTP, resetURI string) (XPack, error) { content, err := fetchPath(http, resetURI, "_xpack", "") if err != nil { return XPack{}, err } var xpack XPack err = json.Unmarshal(content, &xpack) return xpack, err } // IsMLockAllEnabled returns if the given Elasticsearch node has mlockall enabled func IsMLockAllEnabled(http *helper.HTTP, resetURI, nodeID string) (bool, error) { content, err := fetchPath(http, resetURI, "_nodes/"+nodeID, "filter_path=nodes.*.process.mlockall") if err != nil { return false, err } var response map[string]map[string]map[string]map[string]bool err = json.Unmarshal(content, &response) if err != nil { return false, err } for _, nodeInfo := range response["nodes"] { mlockall := nodeInfo["process"]["mlockall"] return mlockall, nil } return false, fmt.Errorf("could not determine if mlockall is enabled on node ID = %v", nodeID) } // GetMasterNodeID returns the ID of the Elasticsearch cluster's master node func GetMasterNodeID(http *helper.HTTP, resetURI string) (string, error) { content, err := fetchPath(http, resetURI, "_nodes/_master", "filter_path=nodes.*.name") if err != nil { return "", err } var response struct { Nodes map[string]interface{} `json:"nodes"` } if err := json.Unmarshal(content, &response); err != nil { return "", err } for nodeID := range response.Nodes { return nodeID, nil } return "", errors.New("could not determine master node ID") } // PassThruField copies the field at the given path from the given source data object into // the same path in the given target data object. func PassThruField(fieldPath string, sourceData, targetData mapstr.M) error { fieldValue, err := sourceData.GetValue(fieldPath) if err != nil { return elastic.MakeErrorForMissingField(fieldPath, elastic.Elasticsearch) } targetData.Put(fieldPath, fieldValue) return nil } // MergeClusterSettings merges cluster settings in the correct precedence order func MergeClusterSettings(clusterSettings mapstr.M) (mapstr.M, error) { transientSettings, err := getSettingGroup(clusterSettings, "transient") if err != nil { return nil, err } persistentSettings, err := getSettingGroup(clusterSettings, "persistent") if err != nil { return nil, err } settings, err := getSettingGroup(clusterSettings, "default") if err != nil { return nil, err } // Transient settings override persistent settings which override default settings if settings == nil { settings = persistentSettings } if settings == nil { settings = transientSettings } if settings == nil { return nil, nil } if persistentSettings != nil { settings.DeepUpdate(persistentSettings) } if transientSettings != nil { settings.DeepUpdate(transientSettings) } return settings, nil } var ( // Global cache for license information. Assumption is that license information changes infrequently. licenseCache = &_licenseCache{} // LicenseCacheEnabled controls whether license caching is enabled or not. Intended for test use. LicenseCacheEnabled = true ) type _licenseCache struct { sync.RWMutex license *License cachedOn time.Time ttl time.Duration } func (c *_licenseCache) get() *License { c.Lock() defer c.Unlock() if time.Since(c.cachedOn) > c.ttl { // We are past the TTL, so invalidate cache c.license = nil } return c.license } func (c *_licenseCache) set(license *License, ttl time.Duration) { if !LicenseCacheEnabled { return } c.Lock() defer c.Unlock() c.license = license c.ttl = ttl c.cachedOn = time.Now() } // IsOneOf returns whether the license is one of the specified candidate licenses func (l *License) IsOneOf(candidateLicenses ...string) bool { t := l.Type for _, candidateLicense := range candidateLicenses { if candidateLicense == t { return true } } return false } // ToMapStr converts the license to a mapstr.M. This is necessary // for proper marshaling of the data before it's sent over the wire. In // particular it ensures that ms-since-epoch values are marshaled as longs // and not floats in scientific notation as Elasticsearch does not like that. func (l *License) ToMapStr() mapstr.M { m := mapstr.M{ "status": l.Status, "uid": l.ID, "type": l.Type, "issue_date": l.IssueDate, "issue_date_in_millis": l.IssueDateInMillis, "expiry_date": l.ExpiryDate, "issued_to": l.IssuedTo, "issuer": l.Issuer, "start_date_in_millis": l.StartDateInMillis, } if l.ExpiryDateInMillis != 0 { // We don't want to record a 0 expiry date as this means the license has expired // in the Stack Monitoring UI m["expiry_date_in_millis"] = l.ExpiryDateInMillis } // Enterprise licenses have max_resource_units. All other licenses have // max_nodes. if l.MaxNodes != 0 { m["max_nodes"] = l.MaxNodes } if l.MaxResourceUnits != 0 { m["max_resource_units"] = l.MaxResourceUnits } return m } func getSettingGroup(allSettings mapstr.M, groupKey string) (mapstr.M, error) { hasSettingGroup, err := allSettings.HasKey(groupKey) if err != nil { return nil, fmt.Errorf("failure to determine if %s settings exist: %w", groupKey, err) } if !hasSettingGroup { return nil, nil } settings, err := allSettings.GetValue(groupKey) if err != nil { return nil, fmt.Errorf("failure to extract %s settings: %w", groupKey, err) } v, ok := settings.(map[string]interface{}) if !ok { return nil, fmt.Errorf("%s settings are not a map", groupKey) } return mapstr.M(v), nil }