pkg/controller/beat/common/stackmon/stackmon.go (105 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package stackmon import ( "context" _ "embed" // for the beats config files "errors" "fmt" "github.com/elastic/cloud-on-k8s/v3/pkg/apis/beat/v1beta1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/association" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/stackmon" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/stackmon/monitoring" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/version" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/volume" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/elasticsearch/bootstrap" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ) const ( FilebeatLogsVolumeName string = "filebeat-logs" FilebeatLogsVolumeMountPath string = "/usr/share/filebeat/logs" MetricbeatLogsVolumeName string = "metricbeat-logs" MetricbeatLogsVolumeMountPath string = "/usr/share/metricbeat/logs" ) var ( // filebeatConfig is a static configuration for Filebeat to collect Beats logs //go:embed filebeat.yml filebeatConfig string // metricbeatConfigTemplate is a configuration template for Metricbeat to collect monitoring data from Beats resources //go:embed metricbeat.tpl.yml metricbeatConfigTemplate string // ErrMonitoringClusterUUIDUnavailable will be returned when the UUID for the Beat ElasticsearchRef cluster // has not yet been assigned a UUID. This could happen on a newly created Elasticsearch cluster. ErrMonitoringClusterUUIDUnavailable = errors.New("cluster UUID for Beats stack monitoring is unavailable") ) func Filebeat(ctx context.Context, client k8s.Client, resource monitoring.HasMonitoring, version string) (stackmon.BeatSidecar, error) { sidecar, err := stackmon.NewFileBeatSidecar(ctx, client, resource, version, filebeatConfig, nil) if err != nil { return stackmon.BeatSidecar{}, err } // Add shared volume for logs consumption. filebeatLogsVolume := volume.NewEmptyDirVolume(FilebeatLogsVolumeName, FilebeatLogsVolumeMountPath) sidecar.Container.VolumeMounts = append(sidecar.Container.VolumeMounts, filebeatLogsVolume.VolumeMount()) sidecar.Volumes = append(sidecar.Volumes, filebeatLogsVolume.Volume()) return sidecar, nil } func MetricBeat(ctx context.Context, client k8s.Client, beat *v1beta1.Beat) (stackmon.BeatSidecar, error) { if err := beat.ElasticsearchRef().IsValid(); err != nil { return stackmon.BeatSidecar{}, err } uuid, err := associatedESUUID(ctx, client, beat) if err != nil { return stackmon.BeatSidecar{}, err } data := struct { ClusterUUID string URL string }{ ClusterUUID: uuid, // https://www.elastic.co/guide/en/beats/metricbeat/current/configuration-metricbeat.html#module-http-config-options // Beat module http options require "http+" to be appended to unix sockets. URL: fmt.Sprintf("http+%s", GetStackMonitoringSocketURL(beat)), } v, err := version.Parse(beat.Spec.Version) if err != nil { return stackmon.BeatSidecar{}, err } cfg, err := stackmon.RenderTemplate(v, metricbeatConfigTemplate, data) if err != nil { return stackmon.BeatSidecar{}, err } sidecar, err := stackmon.NewMetricBeatSidecar(ctx, client, beat, v, nil, cfg) if err != nil { return stackmon.BeatSidecar{}, err } // Add shared volume for Unix socket between containers. sharedDataVolume := volume.NewEmptyDirVolume("shared-data", "/var/shared") sidecar.Container.VolumeMounts = append(sidecar.Container.VolumeMounts, sharedDataVolume.VolumeMount()) sidecar.Volumes = append(sidecar.Volumes, sharedDataVolume.Volume()) // Add shared volume for logs consumption. metricbeatLogsVolume := volume.NewEmptyDirVolume(MetricbeatLogsVolumeName, MetricbeatLogsVolumeMountPath) sidecar.Volumes = append(sidecar.Volumes, metricbeatLogsVolume.Volume()) sidecar.Container.VolumeMounts = append(sidecar.Container.VolumeMounts, metricbeatLogsVolume.VolumeMount()) return sidecar, nil } type clusterUUIDResponse struct { ClusterUUID string `json:"cluster_uuid"` } func associatedESUUID(ctx context.Context, client k8s.Client, beat *v1beta1.Beat) (string, error) { esAssociation := beat.EsAssociation() esRef := esAssociation.AssociationRef() if !esRef.IsDefined() { // no association or indirect association e.g. via output configuration return "", nil } if esRef.IsExternal() { remoteES, err := association.GetUnmanagedAssociationConnectionInfoFromSecret(client, esAssociation) if err != nil { return "", fmt.Errorf("while retrieving external ES connection info: %w", err) } clusterUUIDResponse := &clusterUUIDResponse{} if err := remoteES.Request("/", clusterUUIDResponse); err != nil { return "", fmt.Errorf("while retrieving remote cluster UUID %w", err) } return clusterUUIDResponse.ClusterUUID, nil } var es esv1.Elasticsearch if err := client.Get(ctx, esRef.NamespacedName(), &es); err != nil { return "", err } uuid, ok := es.Annotations[bootstrap.ClusterUUIDAnnotationName] if !ok { // returning specific error here so this operation can be retried. return "", ErrMonitoringClusterUUIDUnavailable } return uuid, nil } // GetStackMonitoringSocketURL will return a path to a Unix socket that will be used to expose and query metrics. // Unix sockets are used instead of network ports to avoid situations where "hostNetwork: true" is enabled on multiple // Beat daemonsets, along with stack monitoring, which will cause 2 pods to try and bind to the same port on the // Node's host network, which will cause bind errors. (bind: address already in use) func GetStackMonitoringSocketURL(beat *v1beta1.Beat) string { // TODO: Enable when Beats as containers in Windows is supported: https://github.com/elastic/beats/issues/16814 // if runtime.GOOS == "windows" { // return fmt.Sprintf("npipe:///%s-%s-%s.sock", beat.Spec.Type, beat.GetNamespace(), beat.GetName()) // } return fmt.Sprintf("unix:///var/shared/%s-%s-%s.sock", beat.Spec.Type, beat.GetNamespace(), beat.GetName()) }