libbeat/monitoring/report/elasticsearch/elasticsearch.go (287 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package elasticsearch import ( "context" "errors" "io" "math/rand/v2" "strconv" "time" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/esleg/eslegclient" "github.com/elastic/beats/v7/libbeat/monitoring/report" "github.com/elastic/beats/v7/libbeat/outputs" "github.com/elastic/beats/v7/libbeat/publisher/pipeline" "github.com/elastic/beats/v7/libbeat/publisher/processing" conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-agent-libs/monitoring" "github.com/elastic/elastic-agent-libs/transport/httpcommon" ) type reporter struct { done *stopper logger *logp.Logger checkRetry time.Duration // event metadata beatMeta mapstr.M tags []string // pipeline pipeline *pipeline.Pipeline client beat.Client out []outputs.NetworkClient } const logSelector = "monitoring" var errNoMonitoring = errors.New("xpack monitoring not available") func init() { report.RegisterReporterFactory("elasticsearch", makeReporter) } func defaultConfig(settings report.Settings) config { c := config{ Hosts: nil, Protocol: "http", Params: nil, Headers: nil, Username: "beats_system", Password: "", APIKey: "", ProxyURL: "", CompressionLevel: 0, MaxRetries: 3, MetricsPeriod: 10 * time.Second, StatePeriod: 1 * time.Minute, BulkMaxSize: 50, BufferSize: 50, Tags: nil, Backoff: backoff{ Init: 1 * time.Second, Max: 60 * time.Second, }, ClusterUUID: settings.ClusterUUID, Transport: httpcommon.DefaultHTTPTransportSettings(), } if settings.DefaultUsername != "" { c.Username = settings.DefaultUsername } return c } func makeReporter(beat beat.Info, settings report.Settings, cfg *conf.C) (report.Reporter, error) { log := beat.Logger.Named(logSelector) config := defaultConfig(settings) if err := cfg.Unpack(&config); err != nil { return nil, err } // Unset username which is set by default, even if no password is set if config.APIKey != "" { config.Username = "" config.Password = "" } // check endpoint availability on startup only every 30 seconds checkRetry := 30 * time.Second windowSize := config.BulkMaxSize - 1 if windowSize <= 0 { windowSize = 1 } params := makeClientParams(config) hosts, err := outputs.ReadHostList(cfg) if err != nil { return nil, err } if len(hosts) == 0 { return nil, errors.New("empty hosts list") } clients := make([]outputs.NetworkClient, len(hosts)) for i, host := range hosts { client, err := makeClient(host, params, &config, beat) if err != nil { return nil, err } clients[i] = client } monitoring := monitoring.Default.GetRegistry("monitoring") outClient := outputs.NewFailoverClient(clients) outClient = outputs.WithBackoff(outClient, config.Backoff.Init, config.Backoff.Max) processing, err := processing.MakeDefaultSupport(true, nil)(beat, log, conf.NewConfig()) if err != nil { return nil, err } queueConfig := conf.Namespace{} conf, err := conf.NewConfigFrom(map[string]interface{}{ "mem.events": 32, "mem.flush.min_events": 1, }) if err != nil { return nil, err } err = queueConfig.Unpack(conf) if err != nil { return nil, err } pipeline, err := pipeline.New( beat, pipeline.Monitors{ Metrics: monitoring, Logger: log, }, queueConfig, outputs.Group{ Clients: []outputs.Client{outClient}, BatchSize: windowSize, Retry: 0, // no retry. Drop event on error. }, pipeline.Settings{ WaitClose: 0, WaitCloseMode: pipeline.NoWaitOnClose, Processors: processing, }) if err != nil { return nil, err } pipeConn, err := pipeline.Connect() if err != nil { pipeline.Close() return nil, err } r := &reporter{ logger: log, done: newStopper(), beatMeta: makeMeta(beat), tags: config.Tags, checkRetry: checkRetry, pipeline: pipeline, client: pipeConn, out: clients, } go r.initLoop(config) return r, nil } func (r *reporter) Stop() { r.done.Stop() r.client.Close() r.pipeline.Close() } func (r *reporter) initLoop(c config) { r.logger.Debug("Start monitoring endpoint init loop.") defer r.logger.Debug("Finish monitoring endpoint init loop.") log := r.logger logged := false for { // Select one configured endpoint by random and check if xpack is available ctx, cancel := context.WithCancel(context.Background()) defer cancel() client := r.out[rand.IntN(len(r.out))] err := client.Connect(ctx) if err == nil { closing(log, client) break } else { if !logged { log.Info("Failed to connect to Elastic X-Pack Monitoring. Either Elasticsearch X-Pack monitoring is not enabled or Elasticsearch is not available. Will keep retrying. Error: ", err) logged = true } r.logger.Debugf("Monitoring could not connect to Elasticsearch, failed with %+v", err) } select { case <-r.done.C(): return case <-time.After(r.checkRetry): } } log.Info("Successfully connected to X-Pack Monitoring endpoint.") // Start collector and send loop if monitoring endpoint has been found. go r.snapshotLoop("state", "state", c.StatePeriod, c.ClusterUUID) // For backward compatibility stats is named to metrics. go r.snapshotLoop("stats", "metrics", c.MetricsPeriod, c.ClusterUUID) } func (r *reporter) snapshotLoop(namespace, prefix string, period time.Duration, clusterUUID string) { ticker := time.NewTicker(period) defer ticker.Stop() log := r.logger log.Infof("Start monitoring %s metrics snapshot loop with period %s.", namespace, period) defer log.Infof("Stop monitoring %s metrics snapshot loop.", namespace) for { var ts time.Time select { case <-r.done.C(): return case ts = <-ticker.C: } snapshot := makeSnapshot(monitoring.GetNamespace(namespace).GetRegistry()) if snapshot == nil { log.Debug("Empty snapshot.") continue } fields := mapstr.M{ "beat": r.beatMeta, prefix: snapshot, } if len(r.tags) > 0 { fields["tags"] = r.tags } meta := mapstr.M{ "type": "beats_" + namespace, "interval_ms": int64(period / time.Millisecond), // Converting to seconds as interval only accepts `s` as unit "params": map[string]string{"interval": strconv.Itoa(int(period/time.Second)) + "s"}, } if clusterUUID == "" { clusterUUID = getClusterUUID() } if clusterUUID != "" { _, _ = meta.Put("cluster_uuid", clusterUUID) } r.client.Publish(beat.Event{ Timestamp: ts, Fields: fields, Meta: meta, }) } } func makeClient(host string, params map[string]string, config *config, beat beat.Info) (outputs.NetworkClient, error) { url, err := common.MakeURL(config.Protocol, "", host, 9200) if err != nil { return nil, err } esClient, err := eslegclient.NewConnection(eslegclient.ConnectionSettings{ URL: url, Beatname: beat.Beat, Username: config.Username, Password: config.Password, APIKey: config.APIKey, Parameters: params, Headers: config.Headers, CompressionLevel: config.CompressionLevel, Transport: config.Transport, UserAgent: beat.UserAgent, }) if err != nil { return nil, err } return newPublishClient(esClient, params, beat.Logger) } func closing(log *logp.Logger, c io.Closer) { if err := c.Close(); err != nil { log.Warnf("Closed failed with: %v", err) } } func makeMeta(beat beat.Info) mapstr.M { return mapstr.M{ "type": beat.Beat, "version": beat.Version, "name": beat.Name, "host": beat.Hostname, "uuid": beat.ID, } } func getClusterUUID() string { stateRegistry := monitoring.GetNamespace("state").GetRegistry() outputsRegistry := stateRegistry.GetRegistry("outputs") if outputsRegistry == nil { return "" } elasticsearchRegistry := outputsRegistry.GetRegistry("elasticsearch") if elasticsearchRegistry == nil { return "" } snapshot := monitoring.CollectFlatSnapshot(elasticsearchRegistry, monitoring.Full, false) return snapshot.Strings["cluster_uuid"] } func makeClientParams(config config) map[string]string { params := map[string]string{} for k, v := range config.Params { params[k] = v } return params }