func startOperator()

in cmd/manager/main.go [436:780]


func startOperator(ctx context.Context) error {
	log.V(1).Info("Effective configuration", "values", viper.AllSettings())

	// update GOMAXPROCS to container cpu limit if necessary
	_, err := maxprocs.Set(maxprocs.Logger(func(s string, i ...interface{}) {
		// maxprocs needs an sprintf format string with args, but our logger needs a string with optional key value pairs,
		// so we need to do this translation
		log.Info(fmt.Sprintf(s, i...))
	}))
	if err != nil {
		log.Error(err, "Error setting GOMAXPROCS")
		return err
	}

	if dev.Enabled {
		// expose pprof if development mode is enabled
		mux := http.NewServeMux()
		mux.HandleFunc("/debug/pprof/", pprof.Index)
		mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
		mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
		mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
		mux.HandleFunc("/debug/pprof/trace", pprof.Trace)

		pprofServer := http.Server{
			Addr:              viper.GetString(operator.DebugHTTPListenFlag),
			Handler:           mux,
			ReadHeaderTimeout: 60 * time.Second,
		}
		log.Info("Starting debug HTTP server", "addr", pprofServer.Addr)

		go func() {
			go func() {
				<-ctx.Done()

				ctx, cancelFunc := context.WithTimeout(context.Background(), debugHTTPShutdownTimeout)
				defer cancelFunc()

				if err := pprofServer.Shutdown(ctx); err != nil {
					log.Error(err, "Failed to shutdown debug HTTP server")
				}
			}()

			if err := pprofServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
				log.Error(err, "Failed to start debug HTTP server")
				panic(err)
			}
		}()
	}

	var dialer net.Dialer
	autoPortForward := viper.GetBool(operator.AutoPortForwardFlag)
	if !dev.Enabled && autoPortForward {
		return fmt.Errorf("development mode must be enabled to use %s", operator.AutoPortForwardFlag)
	} else if autoPortForward {
		log.Info("Warning: auto-port-forwarding is enabled, which is intended for development only")
		dialer = portforward.NewForwardingDialer()
	}

	operatorNamespace := viper.GetString(operator.OperatorNamespaceFlag)
	if operatorNamespace == "" {
		err := fmt.Errorf("operator namespace must be specified using %s", operator.OperatorNamespaceFlag)
		log.Error(err, "Required configuration missing")
		return err
	}

	// set the default container registry
	containerRegistry := viper.GetString(operator.ContainerRegistryFlag)
	log.Info("Setting default container registry", "container_registry", containerRegistry)
	container.SetContainerRegistry(containerRegistry)

	// set the default container repository
	containerRepository := viper.GetString(operator.ContainerRepositoryFlag)
	if containerRepository != "" {
		log.Info("Setting default container repository", "container_repository", containerRepository)
		container.SetContainerRepository(containerRepository)
	}

	// allow users to specify a container suffix unless --ubi-only mode is active
	suffix := viper.GetString(operator.ContainerSuffixFlag)
	if len(suffix) > 0 {
		if viper.IsSet(operator.UBIOnlyFlag) {
			err := fmt.Errorf("must not combine %s and %s flags", operator.UBIOnlyFlag, operator.ContainerSuffixFlag)
			log.Error(err, "Illegal flag combination")
			return err
		}
		container.SetContainerSuffix(suffix)
	}

	// enforce UBI stack images if requested
	ubiOnly := viper.GetBool(operator.UBIOnlyFlag)
	if ubiOnly {
		container.SetContainerSuffix(container.UBISuffix)
		version.GlobalMinStackVersion = version.From(7, 10, 0)
	}

	// Get a config to talk to the apiserver
	cfg, err := ctrl.GetConfig()
	if err != nil {
		log.Error(err, "Failed to obtain client configuration")
		return err
	}

	if qps := float32(viper.GetFloat64(operator.KubeClientQPS)); qps > 0 {
		cfg.QPS = qps
		cfg.Burst = int(qps * 2)
	}

	// set up APM  tracing if configured
	var tracer *apm.Tracer
	if viper.GetBool(operator.EnableTracingFlag) {
		tracer = tracing.NewTracer("elastic-operator")
		// set up APM tracing for client-go
		cfg.Wrap(tracing.ClientGoTransportWrapper(
			apmclientgo.WithDefaultTransaction(tracing.ClientGoCacheTx(tracer)),
		))
	}

	// set the timeout for API client
	cfg.Timeout = viper.GetDuration(operator.KubeClientTimeout)
	// set the timeout for Elasticsearch requests
	esclient.DefaultESClientTimeout = viper.GetDuration(operator.ElasticsearchClientTimeout)

	// Setup Scheme for all resources
	log.Info("Setting up scheme")
	controllerscheme.SetupScheme()
	// also set up the v1beta1 scheme, used by the v1beta1 webhook
	controllerscheme.SetupV1beta1Scheme()

	// Create a new Cmd to provide shared dependencies and start components
	opts := ctrl.Options{
		Scheme:                     clientgoscheme.Scheme,
		LeaderElection:             viper.GetBool(operator.EnableLeaderElection),
		LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
		LeaderElectionID:           LeaderElectionLeaseName,
		LeaderElectionNamespace:    operatorNamespace,
		Logger:                     log.WithName("eck-operator"),
	}

	// configure the manager cache based on the number of managed namespaces
	var managedNamespaces []string
	// do not use viper.GetStringSlice here as it suffers from https://github.com/spf13/viper/issues/380
	if err := viper.UnmarshalKey(operator.NamespacesFlag, &managedNamespaces); err != nil {
		log.Error(err, "Failed to parse managed namespaces flag")
		return err
	}
	switch {
	case len(managedNamespaces) == 0:
		log.Info("Operator configured to manage all namespaces")
	case len(managedNamespaces) == 1 && managedNamespaces[0] == operatorNamespace:
		log.Info("Operator configured to manage a single namespace", "namespace", managedNamespaces[0], "operator_namespace", operatorNamespace)

	default:
		log.Info("Operator configured to manage multiple namespaces", "namespaces", managedNamespaces, "operator_namespace", operatorNamespace)
		// The managed cache should always include the operator namespace so that we can work with operator-internal resources.
		managedNamespaces = append(managedNamespaces, operatorNamespace)
	}

	// implicitly allows watching cluster-scoped resources (e.g. storage classes)
	opts.Cache = cache.Options{DefaultNamespaces: map[string]cache.Config{}}
	for _, ns := range managedNamespaces {
		opts.Cache.DefaultNamespaces[ns] = cache.Config{}
	}

	// only expose prometheus metrics if provided a non-zero port
	metricsPort := viper.GetInt(operator.MetricsPortFlag)
	metricsHost := viper.GetString(operator.MetricsHostFlag)
	if metricsPort != 0 {
		log.Info("Exposing Prometheus metrics on /metrics", "bindAddress", fmt.Sprintf("%s:%d", metricsHost, metricsPort))
	}
	opts.Metrics = metricsserver.Options{
		BindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), // 0 to disable
	}
	if viper.GetBool(operator.MetricsSecureFlag) {
		opts.Metrics.FilterProvider = filters.WithAuthenticationAndAuthorization
		opts.Metrics.SecureServing = true
	}

	if metricsCertDir := viper.GetString(operator.MetricsCertDirFlag); len(metricsCertDir) > 0 {
		opts.Metrics.CertDir = metricsCertDir
	}

	webhookPort := viper.GetInt(operator.WebhookPortFlag)
	webhookCertDir := viper.GetString(operator.WebhookCertDirFlag)
	opts.WebhookServer = crwebhook.NewServer(crwebhook.Options{
		Port:    webhookPort,
		CertDir: webhookCertDir,
	})

	mgr, err := ctrl.NewManager(cfg, opts)
	if err != nil {
		log.Error(err, "Failed to create controller manager")
		return err
	}

	// Retrieve globally shared CA if any
	ca, err := readOptionalCA(viper.GetString(operator.CADirFlag))
	if err != nil {
		log.Error(err, "Cannot read global CA")
		return err
	}

	// Verify cert validity options
	caCertValidity, caCertRotateBefore, err := validateCertExpirationFlags(operator.CACertValidityFlag, operator.CACertRotateBeforeFlag)
	if err != nil {
		log.Error(err, "Invalid CA certificate rotation parameters")
		return err
	}

	log.V(1).Info("Using certificate authority rotation parameters", operator.CACertValidityFlag, caCertValidity, operator.CACertRotateBeforeFlag, caCertRotateBefore)

	certValidity, certRotateBefore, err := validateCertExpirationFlags(operator.CertValidityFlag, operator.CertRotateBeforeFlag)
	if err != nil {
		log.Error(err, "Invalid certificate rotation parameters")
		return err
	}

	log.V(1).Info("Using certificate rotation parameters", operator.CertValidityFlag, certValidity, operator.CertRotateBeforeFlag, certRotateBefore)

	ipFamily, err := chooseAndValidateIPFamily(viper.GetString(operator.IPFamilyFlag), net.ToIPFamily(os.Getenv(settings.EnvPodIP)))
	if err != nil {
		log.Error(err, "Invalid IP family parameter")
		return err
	}

	// Setup a client to set the operator uuid config map
	clientset, err := kubernetes.NewForConfig(cfg)
	if err != nil {
		log.Error(err, "Failed to create Kubernetes client")
		return err
	}

	distributionChannel := viper.GetString(operator.DistributionChannelFlag)
	operatorInfo, err := about.GetOperatorInfo(clientset, operatorNamespace, distributionChannel)
	if err != nil {
		log.Error(err, "Failed to get operator info")
		return err
	}

	log.Info("Setting up controllers")

	exposedNodeLabels, err := esvalidation.NewExposedNodeLabels(viper.GetStringSlice(operator.ExposedNodeLabels))
	if err != nil {
		log.Error(err, "Failed to parse exposed node labels")
		return err
	}

	setDefaultSecurityContext, err := determineSetDefaultSecurityContext(viper.GetString(operator.SetDefaultSecurityContextFlag), clientset)
	if err != nil {
		log.Error(err, "failed to determine how to set default security context")
		return err
	}

	// default hash cache is arbitrarily set to 5 x MaxConcurrentReconcilesFlag
	hashCacheSize := viper.GetInt(operator.MaxConcurrentReconcilesFlag) * 5
	if viper.IsSet(operator.PasswordHashCacheSize) {
		hashCacheSize = viper.GetInt(operator.PasswordHashCacheSize)
	}
	passwordHasher, err := cryptutil.NewPasswordHasher(hashCacheSize)
	if err != nil {
		log.Error(err, "failed to create hash cache")
		return err
	}

	params := operator.Parameters{
		Dialer:                           dialer,
		ElasticsearchObservationInterval: viper.GetDuration(operator.ElasticsearchObservationIntervalFlag),
		ExposedNodeLabels:                exposedNodeLabels,
		IPFamily:                         ipFamily,
		OperatorNamespace:                operatorNamespace,
		OperatorInfo:                     operatorInfo,
		GlobalCA:                         ca,
		CACertRotation: certificates.RotationParams{
			Validity:     caCertValidity,
			RotateBefore: caCertRotateBefore,
		},
		CertRotation: certificates.RotationParams{
			Validity:     certValidity,
			RotateBefore: certRotateBefore,
		},
		PasswordHasher:            passwordHasher,
		MaxConcurrentReconciles:   viper.GetInt(operator.MaxConcurrentReconcilesFlag),
		SetDefaultSecurityContext: setDefaultSecurityContext,
		ValidateStorageClass:      viper.GetBool(operator.ValidateStorageClassFlag),
		Tracer:                    tracer,
	}

	if viper.GetBool(operator.EnableWebhookFlag) {
		setupWebhook(ctx, mgr, params, webhookCertDir, clientset, exposedNodeLabels, managedNamespaces, tracer)
	}

	enforceRbacOnRefs := viper.GetBool(operator.EnforceRBACOnRefsFlag)

	var accessReviewer rbac.AccessReviewer
	if enforceRbacOnRefs {
		accessReviewer = rbac.NewSubjectAccessReviewer(clientset)
	} else {
		accessReviewer = rbac.NewPermissiveAccessReviewer()
	}

	if err := registerControllers(mgr, params, accessReviewer); err != nil {
		return err
	}

	disableTelemetry := viper.GetBool(operator.DisableTelemetryFlag)
	telemetryInterval := viper.GetDuration(operator.TelemetryIntervalFlag)
	go asyncTasks(ctx, mgr, cfg, managedNamespaces, operatorNamespace, operatorInfo, disableTelemetry, telemetryInterval, tracer)

	log.Info("Starting the manager", "uuid", operatorInfo.OperatorUUID,
		"namespace", operatorNamespace, "version", operatorInfo.BuildInfo.Version,
		"build_hash", operatorInfo.BuildInfo.Hash, "build_date", operatorInfo.BuildInfo.Date,
		"build_snapshot", operatorInfo.BuildInfo.Snapshot)

	exitOnErr := make(chan error)

	// start the manager
	go func() {
		if err := mgr.Start(ctx); err != nil {
			log.Error(err, "Failed to start the controller manager")
			exitOnErr <- err
		}
	}()

	// check operator license key
	go func() {
		mgr.GetCache().WaitForCacheSync(ctx)

		lc := commonlicense.NewLicenseChecker(mgr.GetClient(), params.OperatorNamespace)
		licenseType, err := lc.ValidOperatorLicenseKeyType(ctx)
		if err != nil {
			log.Error(err, "Failed to validate operator license key")
			exitOnErr <- err
		} else {
			log.Info("Operator license key validated", "license_type", licenseType)
		}
	}()

	for {
		select {
		case err = <-exitOnErr:
			return err
		case <-ctx.Done():
			return nil
		}
	}
}