main.go (344 lines of code) (raw):
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
package main
import (
"context"
"crypto/tls"
"encoding/json"
"flag"
"fmt"
"os"
"runtime"
"strings"
"time"
routev1 "github.com/openshift/api/route/v1"
monitoringv1 "github.com/prometheus-operator/prometheus-operator/pkg/apis/monitoring/v1"
"github.com/spf13/pflag"
colfeaturegate "go.opentelemetry.io/collector/featuregate"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
k8sapiflag "k8s.io/component-base/cli/flag"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
"sigs.k8s.io/controller-runtime/pkg/webhook"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
otelv1alpha1 "github.com/aws/amazon-cloudwatch-agent-operator/apis/v1alpha1"
"github.com/aws/amazon-cloudwatch-agent-operator/controllers"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/config"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/version"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/webhook/namespacemutation"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/webhook/podmutation"
"github.com/aws/amazon-cloudwatch-agent-operator/internal/webhook/workloadmutation"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/featuregate"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/instrumentation/auto"
"github.com/aws/amazon-cloudwatch-agent-operator/pkg/sidecar"
// +kubebuilder:scaffold:imports
)
const (
cloudwatchAgentImageRepository = "public.ecr.aws/cloudwatch-agent/cloudwatch-agent"
autoInstrumentationJavaImageRepository = "public.ecr.aws/aws-observability/adot-autoinstrumentation-java"
autoInstrumentationPythonImageRepository = "public.ecr.aws/aws-observability/adot-autoinstrumentation-python"
autoInstrumentationDotNetImageRepository = "ghcr.io/open-telemetry/opentelemetry-operator/autoinstrumentation-dotnet"
autoInstrumentationNodeJSImageRepository = "ghcr.io/open-telemetry/opentelemetry-operator/autoinstrumentation-nodejs"
dcgmExporterImageRepository = "nvcr.io/nvidia/k8s/dcgm-exporter"
neuronMonitorImageRepository = "public.ecr.aws/neuron"
targetAllocatorImageRepository = "public.ecr.aws/cloudwatch-agent/cloudwatch-agent-target-allocator"
)
var (
scheme = k8sruntime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
)
type tlsConfig struct {
minVersion string
cipherSuites []string
}
func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(otelv1alpha1.AddToScheme(scheme))
utilruntime.Must(routev1.AddToScheme(scheme))
utilruntime.Must(monitoringv1.AddToScheme(scheme))
// +kubebuilder:scaffold:scheme
}
// stringFlagOrEnv defines a string flag which can be set by an environment variable.
// Precedence: flag > env var > default value.
func stringFlagOrEnv(p *string, name string, envName string, defaultValue string, usage string) {
envValue := os.Getenv(envName)
if envValue != "" {
defaultValue = envValue
}
pflag.StringVar(p, name, defaultValue, usage)
}
func setLangEnvVarsForResource(langStr string, resourceStr string, resource map[string]string) {
if cpu, ok := resource["cpu"]; ok {
os.Setenv("AUTO_INSTRUMENTATION_"+langStr+"_CPU_"+resourceStr, cpu)
}
if memory, ok := resource["memory"]; ok {
os.Setenv("AUTO_INSTRUMENTATION_"+langStr+"_MEM_"+resourceStr, memory)
}
if enabled, ok := resource["enabled"]; ok {
os.Setenv("AUTO_INSTRUMENTATION_"+langStr+"_RUNTIME_ENABLED", enabled)
}
}
func setLangEnvVars(langStr string, cfg map[string]map[string]string) {
if limits, ok := cfg["limits"]; ok {
setLangEnvVarsForResource(langStr, "LIMIT", limits)
}
if requests, ok := cfg["requests"]; ok {
setLangEnvVarsForResource(langStr, "REQUEST", requests)
}
if runtimeMetrics, ok := cfg["runtime_metrics"]; ok {
setLangEnvVarsForResource(langStr, "", runtimeMetrics)
}
}
func main() {
// registers any flags that underlying libraries might use
opts := zap.Options{}
flagset := featuregate.Flags(colfeaturegate.GlobalRegistry())
opts.BindFlags(flag.CommandLine)
pflag.CommandLine.AddGoFlagSet(flag.CommandLine)
pflag.CommandLine.AddGoFlagSet(flagset)
v := version.Get()
// add flags related to this operator
var (
metricsAddr string
probeAddr string
pprofAddr string
agentImage string
autoInstrumentationJava string
autoInstrumentationPython string
autoInstrumentationDotNet string
autoInstrumentationNodeJS string
autoAnnotationConfigStr string
autoInstrumentationConfigStr string
webhookPort int
tlsOpt tlsConfig
dcgmExporterImage string
neuronMonitorImage string
targetAllocatorImage string
)
pflag.StringVar(&metricsAddr, "metrics-addr", ":8080", "The address the metric endpoint binds to.")
pflag.StringVar(&probeAddr, "health-probe-addr", ":8081", "The address the probe endpoint binds to.")
pflag.StringVar(&pprofAddr, "pprof-addr", "", "The address to expose the pprof server. Default is empty string which disables the pprof server.")
stringFlagOrEnv(&agentImage, "agent-image", "RELATED_IMAGE_COLLECTOR", fmt.Sprintf("%s:%s", cloudwatchAgentImageRepository, v.AmazonCloudWatchAgent), "The default CloudWatch Agent image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&autoInstrumentationJava, "auto-instrumentation-java-image", "RELATED_IMAGE_AUTO_INSTRUMENTATION_JAVA", fmt.Sprintf("%s:%s", autoInstrumentationJavaImageRepository, v.AutoInstrumentationJava), "The default OpenTelemetry Java instrumentation image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&autoInstrumentationPython, "auto-instrumentation-python-image", "RELATED_IMAGE_AUTO_INSTRUMENTATION_PYTHON", fmt.Sprintf("%s:%s", autoInstrumentationPythonImageRepository, v.AutoInstrumentationPython), "The default OpenTelemetry Python instrumentation image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&autoInstrumentationDotNet, "auto-instrumentation-dotnet-image", "RELATED_IMAGE_AUTO_INSTRUMENTATION_DOTNET", fmt.Sprintf("%s:%s", autoInstrumentationDotNetImageRepository, v.AutoInstrumentationDotNet), "The default OpenTelemetry Dotnet instrumentation image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&autoInstrumentationNodeJS, "auto-instrumentation-nodejs-image", "RELATED_IMAGE_AUTO_INSTRUMENTATION_NODEJS", fmt.Sprintf("%s:%s", autoInstrumentationNodeJSImageRepository, v.AutoInstrumentationNodeJS), "The default OpenTelemetry NodeJS instrumentation image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&autoAnnotationConfigStr, "auto-annotation-config", "AUTO_ANNOTATION_CONFIG", "", "The configuration for auto-annotation.")
pflag.StringVar(&autoInstrumentationConfigStr, "auto-instrumentation-config", "", "The configuration for auto-instrumentation.")
stringFlagOrEnv(&dcgmExporterImage, "dcgm-exporter-image", "RELATED_IMAGE_DCGM_EXPORTER", fmt.Sprintf("%s:%s", dcgmExporterImageRepository, v.DcgmExporter), "The default DCGM Exporter image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&neuronMonitorImage, "neuron-monitor-image", "RELATED_IMAGE_NEURON_MONITOR", fmt.Sprintf("%s:%s", neuronMonitorImageRepository, v.NeuronMonitor), "The default Neuron monitor image. This image is used when no image is specified in the CustomResource.")
stringFlagOrEnv(&targetAllocatorImage, "target-allocator-image", "RELATED_IMAGE_TARGET_ALLOCATOR", fmt.Sprintf("%s:%s", targetAllocatorImageRepository, v.TargetAllocator), "The default AmazonCloudWatchAgent target allocator image. This image is used when no image is specified in the CustomResource.")
pflag.Parse()
// set instrumentation cpu and memory limits in environment variables to be used for default instrumentation; default values received from https://github.com/open-telemetry/opentelemetry-operator/blob/main/apis/v1alpha1/instrumentation_webhook.go
autoInstrumentationConfig := map[string]map[string]map[string]string{"java": {"limits": {"cpu": "500m", "memory": "64Mi"}, "requests": {"cpu": "50m", "memory": "64Mi"}, "runtime_metrics": {"enabled": "true"}}, "python": {"limits": {"cpu": "500m", "memory": "32Mi"}, "requests": {"cpu": "50m", "memory": "32Mi"}, "runtime_metrics": {"enabled": "true"}}, "dotnet": {"limits": {"cpu": "500m", "memory": "128Mi"}, "requests": {"cpu": "50m", "memory": "128Mi"}, "runtime_metrics": {"enabled": "true"}}, "nodejs": {"limits": {"cpu": "500m", "memory": "128Mi"}, "requests": {"cpu": "50m", "memory": "128Mi"}}}
err := json.Unmarshal([]byte(autoInstrumentationConfigStr), &autoInstrumentationConfig)
if err != nil {
setupLog.Info(fmt.Sprintf("Using default values: %v", autoInstrumentationConfig))
}
if javaVar, ok := autoInstrumentationConfig["java"]; ok {
setLangEnvVars("JAVA", javaVar)
}
if pythonVar, ok := autoInstrumentationConfig["python"]; ok {
setLangEnvVars("PYTHON", pythonVar)
}
if dotNetVar, ok := autoInstrumentationConfig["dotnet"]; ok {
setLangEnvVars("DOTNET", dotNetVar)
}
if nodeJSVar, ok := autoInstrumentationConfig["nodejs"]; ok {
setLangEnvVars("NODEJS", nodeJSVar)
}
// set supported language instrumentation images in environment variable to be used for default instrumentation
os.Setenv("AUTO_INSTRUMENTATION_JAVA", autoInstrumentationJava)
os.Setenv("AUTO_INSTRUMENTATION_PYTHON", autoInstrumentationPython)
os.Setenv("AUTO_INSTRUMENTATION_DOTNET", autoInstrumentationDotNet)
os.Setenv("AUTO_INSTRUMENTATION_NODEJS", autoInstrumentationNodeJS)
logger := zap.New(zap.UseFlagOptions(&opts))
ctrl.SetLogger(logger)
logger.Info("Starting the Amazon CloudWatch Agent Operator",
"amazon-cloudwatch-agent-operator", v.Operator,
"cloudwatch-agent", agentImage,
"auto-instrumentation-java", autoInstrumentationJava,
"auto-instrumentation-python", autoInstrumentationPython,
"auto-instrumentation-dotnet", autoInstrumentationDotNet,
"auto-instrumentation-nodejs", autoInstrumentationNodeJS,
"dcgm-exporter", dcgmExporterImage,
"neuron-monitor", neuronMonitorImage,
"amazon-cloudwatch-agent-target-allocator", targetAllocatorImage,
"build-date", v.BuildDate,
"go-version", v.Go,
"go-arch", runtime.GOARCH,
"go-os", runtime.GOOS,
)
cfg := config.New(
config.WithLogger(ctrl.Log.WithName("config")),
config.WithVersion(v),
config.WithCollectorImage(agentImage),
config.WithAutoInstrumentationJavaImage(autoInstrumentationJava),
config.WithAutoInstrumentationPythonImage(autoInstrumentationPython),
config.WithAutoInstrumentationDotNetImage(autoInstrumentationDotNet),
config.WithAutoInstrumentationNodeJSImage(autoInstrumentationNodeJS),
config.WithDcgmExporterImage(dcgmExporterImage),
config.WithNeuronMonitorImage(neuronMonitorImage),
config.WithTargetAllocatorImage(targetAllocatorImage),
)
watchNamespace, found := os.LookupEnv("WATCH_NAMESPACE")
if found {
setupLog.Info("watching namespace(s)", "namespaces", watchNamespace)
} else {
setupLog.Info("the env var WATCH_NAMESPACE isn't set, watching all namespaces")
}
optionsTlSOptsFuncs := []func(*tls.Config){
func(config *tls.Config) { tlsConfigSetting(config, tlsOpt) },
}
var namespaces map[string]cache.Config
if strings.Contains(watchNamespace, ",") {
namespaces = map[string]cache.Config{}
for _, ns := range strings.Split(watchNamespace, ",") {
namespaces[ns] = cache.Config{}
}
}
mgrOptions := ctrl.Options{
Scheme: scheme,
Metrics: metricsserver.Options{
BindAddress: metricsAddr,
},
HealthProbeBindAddress: probeAddr,
PprofBindAddress: pprofAddr,
WebhookServer: webhook.NewServer(webhook.Options{
Port: webhookPort,
TLSOpts: optionsTlSOptsFuncs,
}),
Cache: cache.Options{
DefaultNamespaces: namespaces,
},
}
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), mgrOptions)
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
ctx := ctrl.SetupSignalHandler()
if err = controllers.NewReconciler(controllers.Params{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("AmazonCloudWatchAgent"),
Scheme: mgr.GetScheme(),
Config: cfg,
Recorder: mgr.GetEventRecorderFor("amazon-cloudwatch-agent-operator"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "AmazonCloudWatchAgent")
os.Exit(1)
}
if err = controllers.NewDcgmExporterReconciler(controllers.Params{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("DcgmExporter"),
Scheme: mgr.GetScheme(),
Config: cfg,
Recorder: mgr.GetEventRecorderFor("amazon-cloudwatch-agent-operator"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "DcgmExporter")
os.Exit(1)
}
if err = controllers.NewNeuronMonitorReconciler(controllers.Params{
Client: mgr.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("NeuronMonitor"),
Scheme: mgr.GetScheme(),
Config: cfg,
Recorder: mgr.GetEventRecorderFor("amazon-cloudwatch-agent-operator"),
}).SetupWithManager(mgr); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "NeuronMonitor")
os.Exit(1)
}
decoder := admission.NewDecoder(mgr.GetScheme())
if os.Getenv("DISABLE_AUTO_ANNOTATION") == "true" || autoAnnotationConfigStr == "" {
setupLog.Info("Auto-annotation is disabled")
} else {
var autoAnnotationConfig auto.AnnotationConfig
if err = json.Unmarshal([]byte(autoAnnotationConfigStr), &autoAnnotationConfig); err != nil {
setupLog.Error(err, "Unable to unmarshal auto-annotation config")
} else {
autoAnnotationMutators := auto.NewAnnotationMutators(
mgr.GetClient(),
mgr.GetAPIReader(),
logger,
autoAnnotationConfig,
instrumentation.NewTypeSet(
instrumentation.TypeJava,
instrumentation.TypePython,
instrumentation.TypeDotNet,
instrumentation.TypeNodeJS,
),
)
mgr.GetWebhookServer().Register("/mutate-v1-workload", &webhook.Admission{
Handler: workloadmutation.NewWebhookHandler(decoder, autoAnnotationMutators)})
mgr.GetWebhookServer().Register("/mutate-v1-namespace", &webhook.Admission{
Handler: namespacemutation.NewWebhookHandler(decoder, autoAnnotationMutators),
})
setupLog.Info("Auto-annotation is enabled")
go waitForWebhookServerStart(
ctx,
mgr.GetWebhookServer().StartedChecker(),
func(ctx context.Context) {
setupLog.Info("Applying auto-annotation")
autoAnnotationMutators.MutateAndPatchAll(ctx)
},
)
}
}
if os.Getenv("ENABLE_WEBHOOKS") != "false" {
if err = otelv1alpha1.SetupCollectorWebhook(mgr, cfg); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "AmazonCloudWatchAgent")
os.Exit(1)
}
if err = otelv1alpha1.SetupInstrumentationWebhook(mgr, cfg); err != nil {
setupLog.Error(err, "unable to create webhook", "webhook", "Instrumentation")
os.Exit(1)
}
mgr.GetWebhookServer().Register("/mutate-v1-pod", &webhook.Admission{
Handler: podmutation.NewWebhookHandler(cfg, ctrl.Log.WithName("pod-webhook"), decoder, mgr.GetClient(),
[]podmutation.PodMutator{
sidecar.NewMutator(logger, cfg, mgr.GetClient()),
instrumentation.NewMutator(logger, mgr.GetClient(), mgr.GetEventRecorderFor("amazon-cloudwatch-agent-operator")),
}),
})
} else {
ctrl.Log.Info("Webhooks are disabled, operator is running an unsupported mode", "ENABLE_WEBHOOKS", "false")
}
// +kubebuilder:scaffold:builder
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up health check")
os.Exit(1)
}
if err := mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil {
setupLog.Error(err, "unable to set up ready check")
os.Exit(1)
}
setupLog.Info("starting manager")
if err := mgr.Start(ctx); err != nil {
setupLog.Error(err, "problem running manager")
os.Exit(1)
}
}
func waitForWebhookServerStart(ctx context.Context, checker healthz.Checker, callback func(context.Context)) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := checker(nil); err == nil {
setupLog.Info("Webhook server has started")
callback(ctx)
return
}
case <-ctx.Done():
return
}
}
}
// This function get the option from command argument (tlsConfig), check the validity through k8sapiflag
// and set the config for webhook server.
// refer to https://pkg.go.dev/k8s.io/component-base/cli/flag
func tlsConfigSetting(cfg *tls.Config, tlsOpt tlsConfig) {
// TLSVersion helper function returns the TLS Version ID for the version name passed.
tlsVersion, err := k8sapiflag.TLSVersion(tlsOpt.minVersion)
if err != nil {
setupLog.Error(err, "TLS version invalid")
}
cfg.MinVersion = tlsVersion
// TLSCipherSuites helper function returns a list of cipher suite IDs from the cipher suite names passed.
cipherSuiteIDs, err := k8sapiflag.TLSCipherSuites(tlsOpt.cipherSuites)
if err != nil {
setupLog.Error(err, "Failed to convert TLS cipher suite name to ID")
}
cfg.CipherSuites = cipherSuiteIDs
}