in pkg/operator/operator.go [184:333]
func New(logger logr.Logger, clientConfig *rest.Config, opts Options) (*Operator, error) {
if err := opts.defaultAndValidate(logger); err != nil {
return nil, fmt.Errorf("invalid options: %w", err)
}
sc, err := NewScheme()
if err != nil {
return nil, fmt.Errorf("unable to initialize Kubernetes scheme: %w", err)
}
host, portStr, err := net.SplitHostPort(opts.ListenAddr)
if err != nil {
return nil, fmt.Errorf("invalid listen address: %w", err)
}
port, err := strconv.Atoi(portStr)
if err != nil {
return nil, fmt.Errorf("invalid port: %w", err)
}
watchObjects := map[client.Object]cache.ByObject{
&corev1.Pod{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.namespace": opts.OperatorNamespace}),
},
&monitoringv1.PodMonitoring{}: {
Field: fields.Everything(),
},
&monitoringv1.ClusterPodMonitoring{}: {
Field: fields.Everything(),
},
&monitoringv1.ClusterNodeMonitoring{}: {
Field: fields.Everything(),
},
&monitoringv1.GlobalRules{}: {
Field: fields.Everything(),
},
&monitoringv1.ClusterRules{}: {
Field: fields.Everything(),
},
&monitoringv1.Rules{}: {
Field: fields.Everything(),
},
&corev1.Secret{}: {
Namespaces: map[string]cache.Config{
opts.OperatorNamespace: {},
opts.PublicNamespace: {},
},
},
&monitoringv1.OperatorConfig{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.namespace": opts.PublicNamespace}),
},
&corev1.Service{}: {
Field: fields.SelectorFromSet(fields.Set{
"metadata.namespace": opts.OperatorNamespace,
"metadata.name": NameAlertmanager,
}),
},
&corev1.ConfigMap{}: {
Field: fields.SelectorFromSet(fields.Set{"metadata.namespace": opts.OperatorNamespace}),
},
&appsv1.DaemonSet{}: {
Field: fields.SelectorFromSet(fields.Set{
"metadata.namespace": opts.OperatorNamespace,
"metadata.name": NameCollector,
}),
},
&appsv1.Deployment{}: {
Field: fields.SelectorFromSet(fields.Set{
"metadata.namespace": opts.OperatorNamespace,
"metadata.name": NameRuleEvaluator,
}),
},
&appsv1.StatefulSet{}: {
Field: fields.SelectorFromSet(fields.Set{
"metadata.namespace": opts.OperatorNamespace,
"metadata.name": NameAlertmanager,
}),
},
}
// Determine whether VPA is installed in the cluster. If so, set up the scaling controller.
var vpaAvailable bool
coreClientConfig := rest.CopyConfig(clientConfig)
coreClientConfig.ContentType = runtime.ContentTypeProtobuf
clientset, err := apiextensions.NewForConfig(coreClientConfig)
if err != nil {
return nil, fmt.Errorf("create clientset: %w", err)
}
if _, err := clientset.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), "verticalpodautoscalers.autoscaling.k8s.io", metav1.GetOptions{}); err != nil {
logger.Info("vertical pod autoscaling is not available, scaling.vpa.enabled option on the OperatorConfig will not work")
} else {
logger.Info("vertical pod autoscaling available, monitoring OperatorConfig for scaling.vpa.enabled option")
vpaAvailable = true
watchObjects[&autoscalingv1.VerticalPodAutoscaler{}] = cache.ByObject{
Field: fields.SelectorFromSet(fields.Set{
"metadata.namespace": opts.OperatorNamespace,
}),
}
}
manager, err := ctrl.NewManager(clientConfig, manager.Options{
Logger: logger,
Scheme: sc,
WebhookServer: webhook.NewServer(webhook.Options{
Host: host,
Port: port,
CertDir: opts.CertDir,
}),
// Don't run a metrics server with the manager. Metrics are being served.
// explicitly in the main routine.
Metrics: metricsserver.Options{
BindAddress: "0",
},
HealthProbeBindAddress: opts.ProbeAddr,
// Manage cluster-wide and namespace resources at the same time.
NewCache: cache.NewCacheFunc(func(_ *rest.Config, options cache.Options) (cache.Cache, error) {
return cache.New(clientConfig, cache.Options{
Scheme: options.Scheme,
// The presence of metadata.namespace has special handling internally causing the
// cache's watch-list to only watch that namespace.
ByObject: watchObjects,
})
}),
})
if err != nil {
return nil, fmt.Errorf("create controller manager: %w", err)
}
webhookChecker := manager.GetWebhookServer().StartedChecker()
if err := manager.AddHealthzCheck("webhooks", webhookChecker); err != nil {
return nil, fmt.Errorf("add healthz check for webhooks: %w", err)
}
if err := manager.AddReadyzCheck("webhooks", webhookChecker); err != nil {
return nil, fmt.Errorf("add readyz check for webhooks: %w", err)
}
client, err := client.New(clientConfig, client.Options{Scheme: sc})
if err != nil {
return nil, fmt.Errorf("create client: %w", err)
}
op := &Operator{
logger: logger,
opts: opts,
client: client,
manager: manager,
vpaAvailable: vpaAvailable,
}
return op, nil
}