in pkg/cmd/operator/operator.go [86:240]
func Run(healthPort, monitoringPort int32, leaderElection bool, leaderElectionID string) {
flag.Parse()
// The logger instantiated here can be changed to any logger
// implementing the logr.Logger interface. This logger will
// be propagated through the whole operator, generating
// uniform and structured logs.
// The constants specified here are zap specific
var logLevel zapcore.Level
logLevelVal, ok := os.LookupEnv("LOG_LEVEL")
if ok {
switch strings.ToLower(logLevelVal) {
case "error":
logLevel = zapcore.ErrorLevel
case "info":
logLevel = zapcore.InfoLevel
case "debug":
logLevel = zapcore.DebugLevel
default:
customLevel, err := strconv.Atoi(strings.ToLower(logLevelVal))
exitOnError(err, "Invalid log-level")
// Need to multiply by -1 to turn logr expected level into zap level
logLevel = zapcore.Level(int8(customLevel) * -1)
}
} else {
logLevel = zapcore.InfoLevel
}
// Use and set atomic level that all following log events are compared with
// in order to evaluate if a given log level on the event is enabled.
logf.SetLogger(zapctrl.New(func(o *zapctrl.Options) {
o.Development = false
o.Level = zap.NewAtomicLevelAt(logLevel)
}))
klog.SetLogger(log.AsLogger())
_, err := maxprocs.Set(maxprocs.Logger(func(f string, a ...interface{}) { log.Info(fmt.Sprintf(f, a)) }))
if err != nil {
log.Error(err, "failed to set GOMAXPROCS from cgroups")
}
printVersion()
watchNamespace, err := getWatchNamespace()
exitOnError(err, "failed to get watch namespace")
ctx := signals.SetupSignalHandler()
cfg, err := config.GetConfig()
exitOnError(err, "cannot get client config")
// Increase maximum burst that is used by client-side throttling,
// to prevent the requests made to apply the bundled Kamelets
// from being throttled.
cfg.QPS = 20
cfg.Burst = 200
bootstrapClient, err := client.NewClientWithConfig(false, cfg)
exitOnError(err, "cannot initialize client")
// We do not rely on the event broadcaster managed by controller runtime,
// so that we can check the operator has been granted permission to create
// Events. This is required for the operator to be installable by standard
// admin users, that are not granted create permission on Events by default.
broadcaster := record.NewBroadcaster()
defer broadcaster.Shutdown()
if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, corev1.GroupName, "events", watchNamespace, "", "create"); err != nil || !ok {
// Do not sink Events to the server as they'll be rejected
broadcaster = event.NewSinkLessBroadcaster(broadcaster)
exitOnError(err, "cannot check permissions for creating Events")
log.Info("Event broadcasting is disabled because of missing permissions to create Events")
}
operatorNamespace := platform.GetOperatorNamespace()
if operatorNamespace == "" {
// Fallback to using the watch namespace when the operator is not in-cluster.
// It does not support local (off-cluster) operator watching resources globally,
// in which case it's not possible to determine a namespace.
operatorNamespace = watchNamespace
if operatorNamespace == "" {
leaderElection = false
log.Info("unable to determine namespace for leader election")
}
}
// Set the operator container image if it runs in-container
platform.OperatorImage, err = getOperatorImage(ctx, bootstrapClient)
exitOnError(err, "cannot get operator container image")
if ok, err := kubernetes.CheckPermission(ctx, bootstrapClient, coordination.GroupName, "leases", operatorNamespace, "", "create"); err != nil || !ok {
leaderElection = false
exitOnError(err, "cannot check permissions for creating Leases")
log.Info("The operator is not granted permissions to create Leases")
}
if !leaderElection {
log.Info("Leader election is disabled!")
}
hasIntegrationLabel, err := labels.NewRequirement(v1.IntegrationLabel, selection.Exists, []string{})
exitOnError(err, "cannot create Integration label selector")
selector := labels.NewSelector().Add(*hasIntegrationLabel)
selectors := cache.SelectorsByObject{
&corev1.Pod{}: {Label: selector},
&appsv1.Deployment{}: {Label: selector},
&batchv1.Job{}: {Label: selector},
&servingv1.Service{}: {Label: selector},
}
if ok, err := kubernetes.IsAPIResourceInstalled(bootstrapClient, batchv1.SchemeGroupVersion.String(), reflect.TypeOf(batchv1.CronJob{}).Name()); ok && err == nil {
selectors[&batchv1.CronJob{}] = struct {
Label labels.Selector
Field fields.Selector
}{
Label: selector,
}
}
mgr, err := manager.New(cfg, manager.Options{
Namespace: watchNamespace,
EventBroadcaster: broadcaster,
LeaderElection: leaderElection,
LeaderElectionNamespace: operatorNamespace,
LeaderElectionID: leaderElectionID,
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaderElectionReleaseOnCancel: true,
HealthProbeBindAddress: ":" + strconv.Itoa(int(healthPort)),
MetricsBindAddress: ":" + strconv.Itoa(int(monitoringPort)),
NewCache: cache.BuilderWithOptions(
cache.Options{
SelectorsByObject: selectors,
},
),
})
exitOnError(err, "")
log.Info("Configuring manager")
exitOnError(mgr.AddHealthzCheck("health-probe", healthz.Ping), "Unable add liveness check")
exitOnError(apis.AddToScheme(mgr.GetScheme()), "")
ctrlClient, err := client.FromManager(mgr)
exitOnError(err, "")
exitOnError(controller.AddToManager(ctx, mgr, ctrlClient), "")
log.Info("Installing operator resources")
installCtx, installCancel := context.WithTimeout(ctx, 1*time.Minute)
defer installCancel()
install.OperatorStartupOptionalTools(installCtx, bootstrapClient, watchNamespace, operatorNamespace, log)
exitOnError(findOrCreateIntegrationPlatform(installCtx, bootstrapClient, operatorNamespace), "failed to create integration platform")
log.Info("Starting the manager")
exitOnError(mgr.Start(ctx), "manager exited non-zero")
}