in cmd/manager/main.go [436:780]
func startOperator(ctx context.Context) error {
log.V(1).Info("Effective configuration", "values", viper.AllSettings())
// update GOMAXPROCS to container cpu limit if necessary
_, err := maxprocs.Set(maxprocs.Logger(func(s string, i ...interface{}) {
// maxprocs needs an sprintf format string with args, but our logger needs a string with optional key value pairs,
// so we need to do this translation
log.Info(fmt.Sprintf(s, i...))
}))
if err != nil {
log.Error(err, "Error setting GOMAXPROCS")
return err
}
if dev.Enabled {
// expose pprof if development mode is enabled
mux := http.NewServeMux()
mux.HandleFunc("/debug/pprof/", pprof.Index)
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
pprofServer := http.Server{
Addr: viper.GetString(operator.DebugHTTPListenFlag),
Handler: mux,
ReadHeaderTimeout: 60 * time.Second,
}
log.Info("Starting debug HTTP server", "addr", pprofServer.Addr)
go func() {
go func() {
<-ctx.Done()
ctx, cancelFunc := context.WithTimeout(context.Background(), debugHTTPShutdownTimeout)
defer cancelFunc()
if err := pprofServer.Shutdown(ctx); err != nil {
log.Error(err, "Failed to shutdown debug HTTP server")
}
}()
if err := pprofServer.ListenAndServe(); !errors.Is(err, http.ErrServerClosed) {
log.Error(err, "Failed to start debug HTTP server")
panic(err)
}
}()
}
var dialer net.Dialer
autoPortForward := viper.GetBool(operator.AutoPortForwardFlag)
if !dev.Enabled && autoPortForward {
return fmt.Errorf("development mode must be enabled to use %s", operator.AutoPortForwardFlag)
} else if autoPortForward {
log.Info("Warning: auto-port-forwarding is enabled, which is intended for development only")
dialer = portforward.NewForwardingDialer()
}
operatorNamespace := viper.GetString(operator.OperatorNamespaceFlag)
if operatorNamespace == "" {
err := fmt.Errorf("operator namespace must be specified using %s", operator.OperatorNamespaceFlag)
log.Error(err, "Required configuration missing")
return err
}
// set the default container registry
containerRegistry := viper.GetString(operator.ContainerRegistryFlag)
log.Info("Setting default container registry", "container_registry", containerRegistry)
container.SetContainerRegistry(containerRegistry)
// set the default container repository
containerRepository := viper.GetString(operator.ContainerRepositoryFlag)
if containerRepository != "" {
log.Info("Setting default container repository", "container_repository", containerRepository)
container.SetContainerRepository(containerRepository)
}
// allow users to specify a container suffix unless --ubi-only mode is active
suffix := viper.GetString(operator.ContainerSuffixFlag)
if len(suffix) > 0 {
if viper.IsSet(operator.UBIOnlyFlag) {
err := fmt.Errorf("must not combine %s and %s flags", operator.UBIOnlyFlag, operator.ContainerSuffixFlag)
log.Error(err, "Illegal flag combination")
return err
}
container.SetContainerSuffix(suffix)
}
// enforce UBI stack images if requested
ubiOnly := viper.GetBool(operator.UBIOnlyFlag)
if ubiOnly {
container.SetContainerSuffix(container.UBISuffix)
version.GlobalMinStackVersion = version.From(7, 10, 0)
}
// Get a config to talk to the apiserver
cfg, err := ctrl.GetConfig()
if err != nil {
log.Error(err, "Failed to obtain client configuration")
return err
}
if qps := float32(viper.GetFloat64(operator.KubeClientQPS)); qps > 0 {
cfg.QPS = qps
cfg.Burst = int(qps * 2)
}
// set up APM tracing if configured
var tracer *apm.Tracer
if viper.GetBool(operator.EnableTracingFlag) {
tracer = tracing.NewTracer("elastic-operator")
// set up APM tracing for client-go
cfg.Wrap(tracing.ClientGoTransportWrapper(
apmclientgo.WithDefaultTransaction(tracing.ClientGoCacheTx(tracer)),
))
}
// set the timeout for API client
cfg.Timeout = viper.GetDuration(operator.KubeClientTimeout)
// set the timeout for Elasticsearch requests
esclient.DefaultESClientTimeout = viper.GetDuration(operator.ElasticsearchClientTimeout)
// Setup Scheme for all resources
log.Info("Setting up scheme")
controllerscheme.SetupScheme()
// also set up the v1beta1 scheme, used by the v1beta1 webhook
controllerscheme.SetupV1beta1Scheme()
// Create a new Cmd to provide shared dependencies and start components
opts := ctrl.Options{
Scheme: clientgoscheme.Scheme,
LeaderElection: viper.GetBool(operator.EnableLeaderElection),
LeaderElectionResourceLock: resourcelock.LeasesResourceLock,
LeaderElectionID: LeaderElectionLeaseName,
LeaderElectionNamespace: operatorNamespace,
Logger: log.WithName("eck-operator"),
}
// configure the manager cache based on the number of managed namespaces
var managedNamespaces []string
// do not use viper.GetStringSlice here as it suffers from https://github.com/spf13/viper/issues/380
if err := viper.UnmarshalKey(operator.NamespacesFlag, &managedNamespaces); err != nil {
log.Error(err, "Failed to parse managed namespaces flag")
return err
}
switch {
case len(managedNamespaces) == 0:
log.Info("Operator configured to manage all namespaces")
case len(managedNamespaces) == 1 && managedNamespaces[0] == operatorNamespace:
log.Info("Operator configured to manage a single namespace", "namespace", managedNamespaces[0], "operator_namespace", operatorNamespace)
default:
log.Info("Operator configured to manage multiple namespaces", "namespaces", managedNamespaces, "operator_namespace", operatorNamespace)
// The managed cache should always include the operator namespace so that we can work with operator-internal resources.
managedNamespaces = append(managedNamespaces, operatorNamespace)
}
// implicitly allows watching cluster-scoped resources (e.g. storage classes)
opts.Cache = cache.Options{DefaultNamespaces: map[string]cache.Config{}}
for _, ns := range managedNamespaces {
opts.Cache.DefaultNamespaces[ns] = cache.Config{}
}
// only expose prometheus metrics if provided a non-zero port
metricsPort := viper.GetInt(operator.MetricsPortFlag)
metricsHost := viper.GetString(operator.MetricsHostFlag)
if metricsPort != 0 {
log.Info("Exposing Prometheus metrics on /metrics", "bindAddress", fmt.Sprintf("%s:%d", metricsHost, metricsPort))
}
opts.Metrics = metricsserver.Options{
BindAddress: fmt.Sprintf("%s:%d", metricsHost, metricsPort), // 0 to disable
}
if viper.GetBool(operator.MetricsSecureFlag) {
opts.Metrics.FilterProvider = filters.WithAuthenticationAndAuthorization
opts.Metrics.SecureServing = true
}
if metricsCertDir := viper.GetString(operator.MetricsCertDirFlag); len(metricsCertDir) > 0 {
opts.Metrics.CertDir = metricsCertDir
}
webhookPort := viper.GetInt(operator.WebhookPortFlag)
webhookCertDir := viper.GetString(operator.WebhookCertDirFlag)
opts.WebhookServer = crwebhook.NewServer(crwebhook.Options{
Port: webhookPort,
CertDir: webhookCertDir,
})
mgr, err := ctrl.NewManager(cfg, opts)
if err != nil {
log.Error(err, "Failed to create controller manager")
return err
}
// Retrieve globally shared CA if any
ca, err := readOptionalCA(viper.GetString(operator.CADirFlag))
if err != nil {
log.Error(err, "Cannot read global CA")
return err
}
// Verify cert validity options
caCertValidity, caCertRotateBefore, err := validateCertExpirationFlags(operator.CACertValidityFlag, operator.CACertRotateBeforeFlag)
if err != nil {
log.Error(err, "Invalid CA certificate rotation parameters")
return err
}
log.V(1).Info("Using certificate authority rotation parameters", operator.CACertValidityFlag, caCertValidity, operator.CACertRotateBeforeFlag, caCertRotateBefore)
certValidity, certRotateBefore, err := validateCertExpirationFlags(operator.CertValidityFlag, operator.CertRotateBeforeFlag)
if err != nil {
log.Error(err, "Invalid certificate rotation parameters")
return err
}
log.V(1).Info("Using certificate rotation parameters", operator.CertValidityFlag, certValidity, operator.CertRotateBeforeFlag, certRotateBefore)
ipFamily, err := chooseAndValidateIPFamily(viper.GetString(operator.IPFamilyFlag), net.ToIPFamily(os.Getenv(settings.EnvPodIP)))
if err != nil {
log.Error(err, "Invalid IP family parameter")
return err
}
// Setup a client to set the operator uuid config map
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
log.Error(err, "Failed to create Kubernetes client")
return err
}
distributionChannel := viper.GetString(operator.DistributionChannelFlag)
operatorInfo, err := about.GetOperatorInfo(clientset, operatorNamespace, distributionChannel)
if err != nil {
log.Error(err, "Failed to get operator info")
return err
}
log.Info("Setting up controllers")
exposedNodeLabels, err := esvalidation.NewExposedNodeLabels(viper.GetStringSlice(operator.ExposedNodeLabels))
if err != nil {
log.Error(err, "Failed to parse exposed node labels")
return err
}
setDefaultSecurityContext, err := determineSetDefaultSecurityContext(viper.GetString(operator.SetDefaultSecurityContextFlag), clientset)
if err != nil {
log.Error(err, "failed to determine how to set default security context")
return err
}
// default hash cache is arbitrarily set to 5 x MaxConcurrentReconcilesFlag
hashCacheSize := viper.GetInt(operator.MaxConcurrentReconcilesFlag) * 5
if viper.IsSet(operator.PasswordHashCacheSize) {
hashCacheSize = viper.GetInt(operator.PasswordHashCacheSize)
}
passwordHasher, err := cryptutil.NewPasswordHasher(hashCacheSize)
if err != nil {
log.Error(err, "failed to create hash cache")
return err
}
params := operator.Parameters{
Dialer: dialer,
ElasticsearchObservationInterval: viper.GetDuration(operator.ElasticsearchObservationIntervalFlag),
ExposedNodeLabels: exposedNodeLabels,
IPFamily: ipFamily,
OperatorNamespace: operatorNamespace,
OperatorInfo: operatorInfo,
GlobalCA: ca,
CACertRotation: certificates.RotationParams{
Validity: caCertValidity,
RotateBefore: caCertRotateBefore,
},
CertRotation: certificates.RotationParams{
Validity: certValidity,
RotateBefore: certRotateBefore,
},
PasswordHasher: passwordHasher,
MaxConcurrentReconciles: viper.GetInt(operator.MaxConcurrentReconcilesFlag),
SetDefaultSecurityContext: setDefaultSecurityContext,
ValidateStorageClass: viper.GetBool(operator.ValidateStorageClassFlag),
Tracer: tracer,
}
if viper.GetBool(operator.EnableWebhookFlag) {
setupWebhook(ctx, mgr, params, webhookCertDir, clientset, exposedNodeLabels, managedNamespaces, tracer)
}
enforceRbacOnRefs := viper.GetBool(operator.EnforceRBACOnRefsFlag)
var accessReviewer rbac.AccessReviewer
if enforceRbacOnRefs {
accessReviewer = rbac.NewSubjectAccessReviewer(clientset)
} else {
accessReviewer = rbac.NewPermissiveAccessReviewer()
}
if err := registerControllers(mgr, params, accessReviewer); err != nil {
return err
}
disableTelemetry := viper.GetBool(operator.DisableTelemetryFlag)
telemetryInterval := viper.GetDuration(operator.TelemetryIntervalFlag)
go asyncTasks(ctx, mgr, cfg, managedNamespaces, operatorNamespace, operatorInfo, disableTelemetry, telemetryInterval, tracer)
log.Info("Starting the manager", "uuid", operatorInfo.OperatorUUID,
"namespace", operatorNamespace, "version", operatorInfo.BuildInfo.Version,
"build_hash", operatorInfo.BuildInfo.Hash, "build_date", operatorInfo.BuildInfo.Date,
"build_snapshot", operatorInfo.BuildInfo.Snapshot)
exitOnErr := make(chan error)
// start the manager
go func() {
if err := mgr.Start(ctx); err != nil {
log.Error(err, "Failed to start the controller manager")
exitOnErr <- err
}
}()
// check operator license key
go func() {
mgr.GetCache().WaitForCacheSync(ctx)
lc := commonlicense.NewLicenseChecker(mgr.GetClient(), params.OperatorNamespace)
licenseType, err := lc.ValidOperatorLicenseKeyType(ctx)
if err != nil {
log.Error(err, "Failed to validate operator license key")
exitOnErr <- err
} else {
log.Info("Operator license key validated", "license_type", licenseType)
}
}()
for {
select {
case err = <-exitOnErr:
return err
case <-ctx.Done():
return nil
}
}
}