pkg/telemetry/telemetry.go (436 lines of code) (raw):

// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one // or more contributor license agreements. Licensed under the Elastic License 2.0; // you may not use this file except in compliance with the Elastic License 2.0. package telemetry import ( "context" "fmt" "strings" "time" "github.com/ghodss/yaml" "go.elastic.co/apm/v2" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "github.com/elastic/cloud-on-k8s/v3/pkg/about" agentv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/agent/v1alpha1" apmv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/apm/v1" esav1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/autoscaling/v1alpha1" beatv1beta1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/beat/v1beta1" esv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/elasticsearch/v1" entv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/enterprisesearch/v1" kbv1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/kibana/v1" logstashv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/logstash/v1alpha1" mapsv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/maps/v1alpha1" policyv1alpha1 "github.com/elastic/cloud-on-k8s/v3/pkg/apis/stackconfigpolicy/v1alpha1" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/reconciler" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/stackmon/monitoring" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/common/tracing" "github.com/elastic/cloud-on-k8s/v3/pkg/controller/kibana" "github.com/elastic/cloud-on-k8s/v3/pkg/license" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/k8s" ulog "github.com/elastic/cloud-on-k8s/v3/pkg/utils/log" "github.com/elastic/cloud-on-k8s/v3/pkg/utils/set" ) const ( resourceCount = "resource_count" podCount = "pod_count" helmManagedResourceCount = "helm_resource_count" timestampFieldName = "timestamp" ) type ECKTelemetry struct { ECK ECK `json:"eck"` } type ECK struct { about.OperatorInfo Stats map[string]interface{} `json:"stats"` License map[string]string `json:"license"` } type getStatsFn func(k8s.Client, []string) (string, interface{}, error) func NewReporter( info about.OperatorInfo, client client.Client, operatorNamespace string, managedNamespaces []string, telemetryInterval time.Duration, tracer *apm.Tracer, ) Reporter { if len(managedNamespaces) == 0 { // treat no managed namespaces as managing all namespaces, ie. set empty string for namespace filtering managedNamespaces = append(managedNamespaces, "") } return Reporter{ operatorInfo: info, client: client, operatorNamespace: operatorNamespace, managedNamespaces: managedNamespaces, telemetryInterval: telemetryInterval, tracer: tracer, } } type Reporter struct { operatorInfo about.OperatorInfo client k8s.Client operatorNamespace string managedNamespaces []string telemetryInterval time.Duration tracer *apm.Tracer } func (r *Reporter) Start(ctx context.Context) { ctx = ulog.InitInContext(ctx, "telemetry") ticker := time.NewTicker(r.telemetryInterval) for range ticker.C { r.report(ctx) } } func marshalTelemetry(ctx context.Context, info about.OperatorInfo, stats map[string]interface{}, license map[string]string) ([]byte, error) { span, _ := apm.StartSpan(ctx, "marshal_telemetry", tracing.SpanTypeApp) defer span.End() return yaml.Marshal(ECKTelemetry{ ECK: ECK{ OperatorInfo: info, Stats: stats, License: license, }, }) } func (r *Reporter) getResourceStats(ctx context.Context) (map[string]interface{}, error) { span, _ := apm.StartSpan(ctx, "get_resource_stats", tracing.SpanTypeApp) defer span.End() stats := map[string]interface{}{} for _, f := range []getStatsFn{ esStats, kbStats, apmStats, beatStats, entStats, agentStats, mapsStats, scpStats, logstashStats, } { key, statsPart, err := f(r.client, r.managedNamespaces) if err != nil { return nil, err } stats[key] = statsPart } return stats, nil } func (r *Reporter) report(ctx context.Context) { ctx = tracing.NewContextTransaction(ctx, r.tracer, tracing.PeriodicTxType, "telemetry-reporter", nil) defer tracing.EndContextTransaction(ctx) log := ulog.FromContext(ctx) stats, err := r.getResourceStats(ctx) if err != nil { log.Error(err, "failed to get resource stats") return } licenseInfo, err := r.getLicenseInfo(ctx) if err != nil { log.Error(err, "failed to get operator license secret") // it's ok to go on } telemetryBytes, err := marshalTelemetry(ctx, r.operatorInfo, stats, licenseInfo) if err != nil { log.Error(err, "failed to marshal telemetry data") return } for _, ns := range r.managedNamespaces { var kibanaList kbv1.KibanaList if err := r.client.List(ctx, &kibanaList, client.InNamespace(ns)); err != nil { log.Error(err, "failed to list Kibanas") continue } for _, kb := range kibanaList.Items { r.reconcileKibanaSecret(ctx, kb, telemetryBytes) } } } func (r *Reporter) reconcileKibanaSecret(ctx context.Context, kb kbv1.Kibana, telemetryBytes []byte) { span, ctx := apm.StartSpan(ctx, "reconcile_kibana_secret", tracing.SpanTypeApp) defer span.End() log := ulog.FromContext(ctx) var secret corev1.Secret nsName := types.NamespacedName{Namespace: kb.Namespace, Name: kbv1.ConfigSecret(kb.Name)} if err := r.client.Get(ctx, nsName, &secret); err != nil { log.Error(err, "failed to get Kibana secret") return } if secret.Data == nil { // should not happen, but just to be safe secret.Data = make(map[string][]byte) } secret.Data[kibana.TelemetryFilename] = telemetryBytes if _, err := reconciler.ReconcileSecret(ctx, r.client, secret, nil); err != nil { log.Error(err, "failed to reconcile Kibana secret") return } } func (r *Reporter) getLicenseInfo(ctx context.Context) (map[string]string, error) { span, _ := apm.StartSpan(ctx, "get_license_info", tracing.SpanTypeApp) defer span.End() nsn := types.NamespacedName{ Namespace: r.operatorNamespace, Name: license.LicensingCfgMapName, } var licenseConfigMap corev1.ConfigMap if err := r.client.Get(ctx, nsn, &licenseConfigMap); err != nil { return nil, err } // remove timestamp field as it doesn't carry any significant information delete(licenseConfigMap.Data, timestampFieldName) return licenseConfigMap.Data, nil } type downwardNodeLabelsStats struct { // ResourceCount is the number of resources which are relying on the node labels downward API. ResourceCount int32 `json:"resource_count"` // DistinctNodeLabelsCount is the number of distinct labels used. DistinctNodeLabelsCount int32 `json:"distinct_node_labels_count"` } func esStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := struct { ResourceCount int32 `json:"resource_count"` HelmManagedResourceCount int32 `json:"helm_resource_count"` PodCount int32 `json:"pod_count"` AutoscaledResourceCount int32 `json:"autoscaled_resource_count"` StackMonitoringLogsCount int32 `json:"stack_monitoring_logs_count"` StackMonitoringMetricsCount int32 `json:"stack_monitoring_metrics_count"` RemoteClustersCount int32 `json:"remote_clusters_count"` RemoteClustersAPIKeysCount int32 `json:"remote_clusters_api_keys_count"` DownwardNodeLabels *downwardNodeLabelsStats `json:"downward_node_labels,omitempty"` }{} distinctNodeLabels := set.Make() var resourcesWithDownwardLabels int32 var esList esv1.ElasticsearchList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &esList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, es := range esList.Items { es := es stats.ResourceCount++ stats.PodCount += es.Status.AvailableNodes rcWithoutAPIKeys, rcWithAPIKeys := es.RemoteClustersCount() stats.RemoteClustersCount += rcWithoutAPIKeys stats.RemoteClustersAPIKeysCount += rcWithAPIKeys if isManagedByHelm(es.Labels) { stats.HelmManagedResourceCount++ } if es.IsAutoscalingAnnotationSet() { stats.AutoscaledResourceCount++ } if es.HasDownwardNodeLabels() { resourcesWithDownwardLabels++ distinctNodeLabels.MergeWith(set.Make(es.DownwardNodeLabels()...)) } if monitoring.IsLogsDefined(&es) { stats.StackMonitoringLogsCount++ } if monitoring.IsMetricsDefined(&es) { stats.StackMonitoringMetricsCount++ } } } if resourcesWithDownwardLabels > 0 { stats.DownwardNodeLabels = &downwardNodeLabelsStats{ ResourceCount: resourcesWithDownwardLabels, DistinctNodeLabelsCount: int32(distinctNodeLabels.Count()), } } var esaList esav1alpha1.ElasticsearchAutoscalerList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &esaList, client.InNamespace(ns)); err != nil { return "", nil, err } stats.AutoscaledResourceCount += int32(len(esaList.Items)) } return "elasticsearches", stats, nil } func isManagedByHelm(labels map[string]string) bool { if val, ok := labels["helm.sh/chart"]; ok { return strings.HasPrefix(val, "eck-elasticsearch-") || strings.HasPrefix(val, "eck-kibana-") } return false } func kbStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := map[string]int32{resourceCount: 0, podCount: 0, helmManagedResourceCount: 0} var kbList kbv1.KibanaList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &kbList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, kb := range kbList.Items { stats[resourceCount]++ stats[podCount] += kb.Status.AvailableNodes if isManagedByHelm(kb.Labels) { stats[helmManagedResourceCount]++ } } } return "kibanas", stats, nil } func apmStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := map[string]int32{resourceCount: 0, podCount: 0} var apmList apmv1.ApmServerList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &apmList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, apm := range apmList.Items { stats[resourceCount]++ stats[podCount] += apm.Status.AvailableNodes } } return "apms", stats, nil } func beatStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { typeToName := func(typ string) string { return fmt.Sprintf("%s_count", typ) } stats := map[string]int32{resourceCount: 0, podCount: 0} for typ := range beatv1beta1.KnownTypes { stats[typeToName(typ)] = 0 } var beatList beatv1beta1.BeatList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &beatList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, beat := range beatList.Items { stats[resourceCount]++ stats[typeToName(beat.Spec.Type)]++ stats[podCount] += beat.Status.AvailableNodes } } return "beats", stats, nil } func entStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := map[string]int32{resourceCount: 0, podCount: 0} var entList entv1.EnterpriseSearchList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &entList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, ent := range entList.Items { stats[resourceCount]++ stats[podCount] += ent.Status.AvailableNodes } } return "enterprisesearches", stats, nil } func agentStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { multipleRefsKey := "multiple_refs" fleetModeKey := "fleet_mode" fleetServerKey := "fleet_server" stats := map[string]int32{resourceCount: 0, podCount: 0, multipleRefsKey: 0} var agentList agentv1alpha1.AgentList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &agentList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, agent := range agentList.Items { stats[resourceCount]++ stats[podCount] += agent.Status.AvailableNodes if len(agent.Spec.ElasticsearchRefs) > 1 { stats[multipleRefsKey]++ } if agent.Spec.FleetModeEnabled() { stats[fleetModeKey]++ } if agent.Spec.FleetServerEnabled { stats[fleetServerKey]++ } } } return "agents", stats, nil } func logstashStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { const ( pipelineCount = "pipeline_count" pipelineRefCount = "pipeline_ref_count" serviceCount = "service_count" stackMonitoringLogsCount = "stack_monitoring_logs_count" stackMonitoringMetricsCount = "stack_monitoring_metrics_count" ) stats := map[string]int32{resourceCount: 0, podCount: 0, stackMonitoringLogsCount: 0, stackMonitoringMetricsCount: 0, serviceCount: 0, pipelineCount: 0, pipelineRefCount: 0} var logstashList logstashv1alpha1.LogstashList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &logstashList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, ls := range logstashList.Items { ls := ls stats[resourceCount]++ stats[serviceCount] += int32(len(ls.Spec.Services)) stats[podCount] += ls.Status.AvailableNodes stats[pipelineCount] += int32(len(ls.Spec.Pipelines)) if ls.Spec.PipelinesRef != nil { stats[pipelineRefCount]++ } if monitoring.IsLogsDefined(&ls) { stats[stackMonitoringLogsCount]++ } if monitoring.IsMetricsDefined(&ls) { stats[stackMonitoringMetricsCount]++ } } } return "logstashes", stats, nil } func mapsStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := map[string]int32{resourceCount: 0, podCount: 0} var mapsList mapsv1alpha1.ElasticMapsServerList for _, ns := range managedNamespaces { if err := k8sClient.List(context.Background(), &mapsList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, maps := range mapsList.Items { stats[resourceCount]++ stats[podCount] += maps.Status.AvailableNodes } } return "maps", stats, nil } // stackConfigPolicyStats models StackConfigPolicy resources usage statistics. type stackConfigPolicyStats struct { ResourceCount int `json:"resource_count"` ConfiguredResourcesCount int `json:"configured_resources_count"` Settings struct { ClusterSettingsCount int `json:"cluster_settings_count"` SnapshotRepositoriesCount int `json:"snapshot_repositories_count"` SnapshotLifecyclePoliciesCount int `json:"snapshot_lifecycle_policies_count"` RoleMappingsCount int `json:"role_mappings_count"` IndexLifecyclePoliciesCount int `json:"index_lifecycle_policies_count"` IngestPipelinesCount int `json:"ingest_pipelines_count"` ComponentTemplatesCount int `json:"component_templates_count"` ComposableIndexTemplatesCount int `json:"composable_index_templates_count"` } `json:"settings"` } func scpStats(k8sClient k8s.Client, managedNamespaces []string) (string, interface{}, error) { stats := stackConfigPolicyStats{} for _, ns := range managedNamespaces { var scpList policyv1alpha1.StackConfigPolicyList if err := k8sClient.List(context.Background(), &scpList, client.InNamespace(ns)); err != nil { return "", nil, err } for _, scp := range scpList.Items { stats.ResourceCount++ stats.ConfiguredResourcesCount += scp.Status.Resources if scp.Spec.Elasticsearch.ClusterSettings != nil { stats.Settings.ClusterSettingsCount += len(scp.Spec.Elasticsearch.ClusterSettings.Data) } if scp.Spec.Elasticsearch.SnapshotRepositories != nil { stats.Settings.SnapshotRepositoriesCount += len(scp.Spec.Elasticsearch.SnapshotRepositories.Data) } if scp.Spec.Elasticsearch.SnapshotLifecyclePolicies != nil { stats.Settings.SnapshotLifecyclePoliciesCount += len(scp.Spec.Elasticsearch.SnapshotLifecyclePolicies.Data) } if scp.Spec.Elasticsearch.SecurityRoleMappings != nil { stats.Settings.RoleMappingsCount += len(scp.Spec.Elasticsearch.SecurityRoleMappings.Data) } if scp.Spec.Elasticsearch.IndexLifecyclePolicies != nil { stats.Settings.IndexLifecyclePoliciesCount += len(scp.Spec.Elasticsearch.IndexLifecyclePolicies.Data) } if scp.Spec.Elasticsearch.IngestPipelines != nil { stats.Settings.IngestPipelinesCount += len(scp.Spec.Elasticsearch.IngestPipelines.Data) } if scp.Spec.Elasticsearch.IndexTemplates.ComponentTemplates != nil { stats.Settings.ComponentTemplatesCount += len(scp.Spec.Elasticsearch.IndexTemplates.ComponentTemplates.Data) } if scp.Spec.Elasticsearch.IndexTemplates.ComposableIndexTemplates != nil { stats.Settings.ComposableIndexTemplatesCount += len(scp.Spec.Elasticsearch.IndexTemplates.ComposableIndexTemplates.Data) } } } return "stackconfigpolicies", stats, nil }