main.go (146 lines of code) (raw):

// Licensed to Elasticsearch B.V. under one or more contributor // license agreements. See the NOTICE.txt file distributed with // this work for additional information regarding copyright // ownership. Elasticsearch B.V. licenses this file to you under // the Apache License, Version 2.0 (the "License"); you may // not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, // software distributed under the License is distributed on an // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. package main import ( "context" "flag" "fmt" "os" "github.com/go-logr/logr" // Load all auth plugins _ "k8s.io/client-go/plugin/pkg/client/auth" openapinamer "k8s.io/apiserver/pkg/endpoints/openapi" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/client-go/kubernetes" "k8s.io/component-base/logs" "sigs.k8s.io/custom-metrics-apiserver/pkg/apiserver" basecmd "sigs.k8s.io/custom-metrics-apiserver/pkg/cmd" "go.elastic.co/apm/v2" _ "github.com/KimMachineGun/automemlimit" generatedopenapi "github.com/elastic/elasticsearch-k8s-metrics-adapter/generated/openapi" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client/custom_api" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/client/elasticsearch" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/config" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/log" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/monitoring" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/profiling" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/provider" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/registry" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/scheduler" "github.com/elastic/elasticsearch-k8s-metrics-adapter/pkg/tracing" ) const ( serviceType = "elasticsearch-k8s-metrics-adapter" elastisearchMetricServerType = "elasticsearch" customMetricServerType = "custom" ) var ( serviceVersion string logger logr.Logger ) func main() { cmd := &ElasticsearchAdapter{} cmd.OpenAPIConfig = genericapiserver.DefaultOpenAPIConfig(generatedopenapi.GetOpenAPIDefinitions, openapinamer.NewDefinitionNamer(apiserver.Scheme)) cmd.OpenAPIConfig.Info.Title = serviceType cmd.OpenAPIConfig.Info.Version = serviceVersion logs.AddFlags(cmd.Flags()) cmd.Flags().BoolVar(&cmd.Insecure, "insecure", false, "if true authentication and authorization are disabled, only to be used in dev mode") cmd.Flags().IntVar(&cmd.MonitoringPort, "monitoring-port", 9090, "port to expose readiness and Prometheus metrics") cmd.Flags().IntVar(&cmd.ProfilingPort, "profiling-port", 0, "port to expose pprof profiling") cmd.Flags().AddGoFlagSet(flag.CommandLine) // make sure we get the klog flags err := cmd.Flags().Parse(os.Args) if err != nil { logErrorAndExit(err, "Unable to parse flags") } flushLogs := log.Configure(cmd.Flags(), serviceType, serviceVersion) defer flushLogs() logger = log.ForPackage("main") adapterCfg, err := config.Parse() if err != nil { logErrorAndExit(err, "Unable to parse adapter configuration") } logger.Info("Starting monitoring server...") monitoringServer := monitoring.NewServer(adapterCfg.MetricServers, cmd.MonitoringPort, adapterCfg.ReadinessProbe.FailureThreshold) go monitoringServer.Start() if cmd.ProfilingPort > 0 { logger.Info("Starting profiling server...") go profiling.StartProfiling(cmd.ProfilingPort) } apmTracer, err := apm.NewTracer(serviceType, serviceVersion) if err != nil { logErrorAndExit(err, "Unable to create APM tracer") } apmTracer.SetLogger(&tracing.Logger{}) metricsClients, err := cmd.newMetricsClients(adapterCfg, apmTracer) if err != nil { logErrorAndExit(err, "Unable to create metrics provider") } scheduler := scheduler.NewScheduler(metricsClients...) metricsRegistry := registry.NewRegistry() scheduler. WithMetricListeners(monitoringServer, metricsRegistry). WithErrorListeners(monitoringServer). Start(). WaitInitialSync() aggProvider := provider.NewAggregationProvider(metricsRegistry, apmTracer) cmd.WithCustomMetrics(aggProvider) cmd.WithExternalMetrics(aggProvider) if cmd.Insecure { cmd.Authentication = nil cmd.Authorization = nil } logger.Info("Starting elastic k8s metrics adapter...") if err := cmd.Run(context.WithoutCancel(context.Background())); err != nil { logErrorAndExit(err, "Unable to run elastic k8s metrics adapter") } } type ElasticsearchAdapter struct { basecmd.AdapterBase Insecure bool PrometheusMetricsEnabled bool MonitoringPort int ProfilingPort int } func (a *ElasticsearchAdapter) newMetricsClients(adapterCfg *config.Config, tracer *apm.Tracer) ([]client.Interface, error) { dynamicClient, err := a.DynamicClient() if err != nil { return nil, fmt.Errorf("unable to construct dynamic dynamicClient: %w", err) } mapper, err := a.RESTMapper() if err != nil { return nil, fmt.Errorf("unable to construct dynamicClient REST mapper: %w", err) } var clients []client.Interface for _, clientCfg := range adapterCfg.MetricServers { switch clientCfg.ServerType { case elastisearchMetricServerType: esMetricClient, err := elasticsearch.NewElasticsearchClient( clientCfg, dynamicClient, mapper, tracer, ) if err != nil { return nil, fmt.Errorf("unable to construct Elasticsearch dynamicClient: %w", err) } clients = append(clients, esMetricClient) case customMetricServerType: kubeClientCfg, err := a.ClientConfig() if err != nil { return nil, fmt.Errorf("unable to construct Kubernetes dynamicClient config: %w", err) } kubeClient, err := kubernetes.NewForConfig(kubeClientCfg) if err != nil { return nil, fmt.Errorf("unable to construct Kubernetes dynamicClient: %w", err) } metricApiClient, err := custom_api.NewMetricApiClientProvider(kubeClientCfg, mapper).NewClient(kubeClient, clientCfg) if err != nil { return nil, fmt.Errorf("unable to construct Kubernetes custom metric API dynamicClient: %w", err) } clients = append(clients, metricApiClient) } } return clients, nil } func logErrorAndExit(err error, msg string) { logger.Error(err, msg) os.Exit(1) }