func NewOperator()

in pkg/operator/operator.go [101:233]


func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) {
	kubeletCompatibilityAnnotationKey := fmt.Sprintf("%s/%s", apis.CompatibilityGroup, "v1beta1-kubelet-conversion")
	// we are going to panic if any of the customer nodepools contain
	// compatibility.karpenter.sh/v1beta1-kubelet-conversion
	restConfig := clinetconfig.GetConfigOrDie()
	kubeClient := lo.Must(client.New(restConfig, client.Options{}))
	nodePoolList := &karpv1.NodePoolList{}
	lo.Must0(kubeClient.List(ctx, nodePoolList))
	npNames := lo.FilterMap(nodePoolList.Items, func(np karpv1.NodePool, _ int) (string, bool) {
		_, ok := np.Annotations[kubeletCompatibilityAnnotationKey]
		return np.Name, ok
	})
	if len(npNames) != 0 {
		stdlog.Fatalf("The kubelet compatibility annotation, %s, is not supported on Karpenter v1.1+. Please refer to the upgrade guide in the docs. The following NodePools still have the compatibility annotation: %s", kubeletCompatibilityAnnotationKey, strings.Join(npNames, ", "))
	}

	cfg := prometheusv2.WithPrometheusMetrics(WithUserAgent(lo.Must(config.LoadDefaultConfig(ctx))), crmetrics.Registry)
	cfg.APIOptions = append(cfg.APIOptions, middleware.StructuredErrorHandler)
	if cfg.Region == "" {
		log.FromContext(ctx).V(1).Info("retrieving region from IMDS")
		region := lo.Must(imds.NewFromConfig(cfg).GetRegion(ctx, nil))
		cfg.Region = region.Region
	}
	ec2api := ec2.NewFromConfig(cfg)
	eksapi := eks.NewFromConfig(cfg)
	log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")
	if err := CheckEC2Connectivity(ctx, ec2api); err != nil {
		log.FromContext(ctx).Error(err, "ec2 api connectivity check failed")
		os.Exit(1)
	}
	log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")
	clusterEndpoint, err := ResolveClusterEndpoint(ctx, eksapi)
	if err != nil {
		log.FromContext(ctx).Error(err, "failed detecting cluster endpoint")
		os.Exit(1)
	} else {
		log.FromContext(ctx).WithValues("cluster-endpoint", clusterEndpoint).V(1).Info("discovered cluster endpoint")
	}
	kubeDNSIP, err := KubeDNSIP(ctx, operator.KubernetesInterface)
	if err != nil {
		// If we fail to get the kube-dns IP, we don't want to crash because this causes issues with custom DNS setups
		// https://github.com/aws/karpenter-provider-aws/issues/2787
		log.FromContext(ctx).V(1).Info(fmt.Sprintf("unable to detect the IP of the kube-dns service, %s", err))
	} else {
		log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
	}
	unavailableOfferingsCache := awscache.NewUnavailableOfferings()
	ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)
	validationCache := cache.New(awscache.ValidationTTL, awscache.DefaultCleanupInterval)

	subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
	securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
	instanceProfileProvider := instanceprofile.NewDefaultProvider(iam.NewFromConfig(cfg), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval))
	pricingProvider := pricing.NewDefaultProvider(
		ctx,
		pricing.NewAPI(cfg),
		ec2api,
		cfg.Region,
	)
	versionProvider := version.NewDefaultProvider(operator.KubernetesInterface, eksapi)
	// Ensure we're able to hydrate the version before starting any reliant controllers.
	// Version updates are hydrated asynchronously after this, in the event of a failure
	// the previously resolved value will be used.
	lo.Must0(versionProvider.UpdateVersion(ctx))
	ssmProvider := ssmp.NewDefaultProvider(ssm.NewFromConfig(cfg), ssmCache)
	amiProvider := amifamily.NewDefaultProvider(operator.Clock, versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
	amiResolver := amifamily.NewDefaultResolver()
	launchTemplateProvider := launchtemplate.NewDefaultProvider(
		ctx,
		cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
		ec2api,
		eksapi,
		amiResolver,
		securityGroupProvider,
		subnetProvider,
		lo.Must(GetCABundle(ctx, operator.GetConfig())),
		operator.Elected(),
		kubeDNSIP,
		clusterEndpoint,
	)
	capacityReservationProvider := capacityreservation.NewProvider(
		ec2api,
		operator.Clock,
		cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
		cache.New(awscache.CapacityReservationAvailabilityTTL, awscache.DefaultCleanupInterval),
	)
	instanceTypeProvider := instancetype.NewDefaultProvider(
		cache.New(awscache.InstanceTypesZonesAndOfferingsTTL, awscache.DefaultCleanupInterval),
		cache.New(awscache.InstanceTypesZonesAndOfferingsTTL, awscache.DefaultCleanupInterval),
		cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval),
		ec2api,
		subnetProvider,
		pricingProvider,
		capacityReservationProvider,
		unavailableOfferingsCache,
		instancetype.NewDefaultResolver(cfg.Region),
	)
	instanceProvider := instance.NewDefaultProvider(
		ctx,
		cfg.Region,
		operator.EventRecorder,
		ec2api,
		unavailableOfferingsCache,
		subnetProvider,
		launchTemplateProvider,
		capacityReservationProvider,
	)

	// Setup field indexers on instanceID -- specifically for the interruption controller
	if options.FromContext(ctx).InterruptionQueue != "" {
		SetupIndexers(ctx, operator.Manager)
	}
	return ctx, &Operator{
		Operator:                    operator,
		Config:                      cfg,
		UnavailableOfferingsCache:   unavailableOfferingsCache,
		SSMCache:                    ssmCache,
		ValidationCache:             validationCache,
		SubnetProvider:              subnetProvider,
		SecurityGroupProvider:       securityGroupProvider,
		InstanceProfileProvider:     instanceProfileProvider,
		AMIProvider:                 amiProvider,
		AMIResolver:                 amiResolver,
		VersionProvider:             versionProvider,
		LaunchTemplateProvider:      launchTemplateProvider,
		PricingProvider:             pricingProvider,
		InstanceTypesProvider:       instanceTypeProvider,
		InstanceProvider:            instanceProvider,
		SSMProvider:                 ssmProvider,
		CapacityReservationProvider: capacityReservationProvider,
		EC2API:                      ec2api,
	}
}