in pkg/cmd/operator/operator.go [84:237]
func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) {
flag.Parse()
// The logger instantiated here can be changed to any logger
// implementing the logr.Logger interface. This logger will
// be propagated through the whole operator, generating
// uniform and structured logs.
// The constants specified here are zap specific
var logLevel zapcore.Level
logLevelVal, ok := os.LookupEnv("LOG_LEVEL")
if ok {
switch strings.ToLower(logLevelVal) {
case "error":
logLevel = zapcore.ErrorLevel
case "info":
logLevel = zapcore.InfoLevel
case "debug":
logLevel = zapcore.DebugLevel
default:
customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal))
exitOnError(err, "Invalid log-level")
int8Lev, err := util.IToInt8(customLevel)
exitOnError(err, "Invalid log-level")
// Need to multiply by -1 to turn logr expected level into zap level
logLevel = zapcore.Level(*int8Lev * -1)
}
} else {
logLevel = zapcore.InfoLevel
}
// Use and set atomic level that all following log events are compared with
// in order to evaluate if a given log level on the event is enabled.
logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) {
o.Development = false
o.Level = zap.NewAtomicLevelAt(logLevel)
}))
klog.SetLogger(log.AsLogger())
_, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) }))
if err != nil {
log.Error(err, "failed to set GOMAXPROCS from cgroups")
}
printVersion()
watchNamespace, err := getWatchNamespace()
exitOnError(err, "failed to get watch namespace")
ctx := signals.SetupSignalHandler()
cfg, err := config.GetConfig()
exitOnError(err, "cannot get client config")
// Increase maximum burst that is used by client-side throttling,
// to prevent the requests made to apply the bundled Kamelets
// from being throttled.
cfg.QPS = 20
cfg.Burst = 200
bootstrapClient, err := client.NewClientWithConfig(false, cfg)
exitOnError(err, "cannot initialize client")
operatorNamespace := platform.GetOperatorNamespace()
if operatorNamespace == "" {
// Fallback to using the watch namespace when the operator is not in-cluster.
// It does not support local (off-cluster) operator watching resources globally,
// in which case it's not possible to determine a namespace.
operatorNamespace = watchNamespace
if operatorNamespace == "" {
leaderElection = false
log.Info("unable to determine namespace for leader election")
}
}
// Set the operator container image if it runs in-container
platform.OperatorImage, err = getOperatorImage(ctx, bootstrapClient)
exitOnError(err, "cannot get operator container image")
if !leaderElection {
log.Info("Leader election is disabled!")
}
hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{})
exitOnError(err, "cannot create Integration label selector")
labelsSelector := labels.NewSelector().Add(*hasIntegrationLabel)
selector := cache.ByObject{
Label: labelsSelector,
}
if !platform.IsCurrentOperatorGlobal() {
selector = cache.ByObject{
Label: labelsSelector,
Namespaces: getNamespacesSelector(operatorNamespace, watchNamespace),
}
}
selectors := map[ctrl.Object]cache.ByObject{
&corev1.Pod{}: selector,
&appsv1.Deployment{}: selector,
&batchv1.Job{}: selector,
}
if ok, err := kubernetes.IsAPIResourceInstalled(bootstrapClient, servingv1.SchemeGroupVersion.String(), reflect.TypeOf(servingv1.Service{}).Name()); ok && err == nil {
selectors[&servingv1.Service{}] = selector
}
if ok, err := kubernetes.IsAPIResourceInstalled(bootstrapClient, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
selectors[&batchv1.CronJob{}] = selector
}
options := cache.Options{
ByObject: selectors,
}
if !platform.IsCurrentOperatorGlobal() {
options.DefaultNamespaces = getNamespacesSelector(operatorNamespace, watchNamespace)
}
mgr, err := manager.New(cfg, manager.Options{
LeaderElection: leaderElection,
LeaderElectionNamespace: operatorNamespace,
LeaderElectionID: leaderElectionID,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaderElectionReleaseOnCancel: true,
HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)),
Metrics: metricsserver.Options{BindAddress: ":" + strconv.Itoa(int(monitoringPort))},
Cache: options,
})
exitOnError(err, "")
log.Info("Configuring manager")
exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check")
exitOnError(apis.AddToScheme(mgr.GetScheme()), "")
ctrlClient, err := client.FromManager(mgr)
exitOnError(err, "")
exitOnError(controller.AddToManager(ctx, mgr, ctrlClient), "")
log.Info("Installing operator resources")
installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute)
defer installCancel()
install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log)
synthEnvVal, synth := os.LookupEnv("CAMEL_K_SYNTHETIC_INTEGRATIONS")
if synth && synthEnvVal == "true" {
log.Info("Starting the synthetic Integration manager")
exitOnError(synthetic.ManageSyntheticIntegrations(ctx, ctrlClient, mgr.GetCache()), "synthetic Integration manager error")
} else {
log.Info("Synthetic Integration manager not configured, skipping")
}
log.Info("Starting the manager")
exitOnError(mgr.Start(ctx), "manager exited non-zero")
}