in pkg/operator/operator.go [101:233]
func NewOperator(ctx context.Context, operator *operator.Operator) (context.Context, *Operator) {
kubeletCompatibilityAnnotationKey := fmt.Sprintf("%s/%s", apis.CompatibilityGroup, "v1beta1-kubelet-conversion")
// we are going to panic if any of the customer nodepools contain
// compatibility.karpenter.sh/v1beta1-kubelet-conversion
restConfig := clinetconfig.GetConfigOrDie()
kubeClient := lo.Must(client.New(restConfig, client.Options{}))
nodePoolList := &karpv1.NodePoolList{}
lo.Must0(kubeClient.List(ctx, nodePoolList))
npNames := lo.FilterMap(nodePoolList.Items, func(np karpv1.NodePool, _ int) (string, bool) {
_, ok := np.Annotations[kubeletCompatibilityAnnotationKey]
return np.Name, ok
})
if len(npNames) != 0 {
stdlog.Fatalf("The kubelet compatibility annotation, %s, is not supported on Karpenter v1.1+. Please refer to the upgrade guide in the docs. The following NodePools still have the compatibility annotation: %s", kubeletCompatibilityAnnotationKey, strings.Join(npNames, ", "))
}
cfg := prometheusv2.WithPrometheusMetrics(WithUserAgent(lo.Must(config.LoadDefaultConfig(ctx))), crmetrics.Registry)
cfg.APIOptions = append(cfg.APIOptions, middleware.StructuredErrorHandler)
if cfg.Region == "" {
log.FromContext(ctx).V(1).Info("retrieving region from IMDS")
region := lo.Must(imds.NewFromConfig(cfg).GetRegion(ctx, nil))
cfg.Region = region.Region
}
ec2api := ec2.NewFromConfig(cfg)
eksapi := eks.NewFromConfig(cfg)
log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")
if err := CheckEC2Connectivity(ctx, ec2api); err != nil {
log.FromContext(ctx).Error(err, "ec2 api connectivity check failed")
os.Exit(1)
}
log.FromContext(ctx).WithValues("region", cfg.Region).V(1).Info("discovered region")
clusterEndpoint, err := ResolveClusterEndpoint(ctx, eksapi)
if err != nil {
log.FromContext(ctx).Error(err, "failed detecting cluster endpoint")
os.Exit(1)
} else {
log.FromContext(ctx).WithValues("cluster-endpoint", clusterEndpoint).V(1).Info("discovered cluster endpoint")
}
kubeDNSIP, err := KubeDNSIP(ctx, operator.KubernetesInterface)
if err != nil {
// If we fail to get the kube-dns IP, we don't want to crash because this causes issues with custom DNS setups
// https://github.com/aws/karpenter-provider-aws/issues/2787
log.FromContext(ctx).V(1).Info(fmt.Sprintf("unable to detect the IP of the kube-dns service, %s", err))
} else {
log.FromContext(ctx).WithValues("kube-dns-ip", kubeDNSIP).V(1).Info("discovered kube dns")
}
unavailableOfferingsCache := awscache.NewUnavailableOfferings()
ssmCache := cache.New(awscache.SSMCacheTTL, awscache.DefaultCleanupInterval)
validationCache := cache.New(awscache.ValidationTTL, awscache.DefaultCleanupInterval)
subnetProvider := subnet.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AvailableIPAddressTTL, awscache.DefaultCleanupInterval), cache.New(awscache.AssociatePublicIPAddressTTL, awscache.DefaultCleanupInterval))
securityGroupProvider := securitygroup.NewDefaultProvider(ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
instanceProfileProvider := instanceprofile.NewDefaultProvider(iam.NewFromConfig(cfg), cache.New(awscache.InstanceProfileTTL, awscache.DefaultCleanupInterval))
pricingProvider := pricing.NewDefaultProvider(
ctx,
pricing.NewAPI(cfg),
ec2api,
cfg.Region,
)
versionProvider := version.NewDefaultProvider(operator.KubernetesInterface, eksapi)
// Ensure we're able to hydrate the version before starting any reliant controllers.
// Version updates are hydrated asynchronously after this, in the event of a failure
// the previously resolved value will be used.
lo.Must0(versionProvider.UpdateVersion(ctx))
ssmProvider := ssmp.NewDefaultProvider(ssm.NewFromConfig(cfg), ssmCache)
amiProvider := amifamily.NewDefaultProvider(operator.Clock, versionProvider, ssmProvider, ec2api, cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval))
amiResolver := amifamily.NewDefaultResolver()
launchTemplateProvider := launchtemplate.NewDefaultProvider(
ctx,
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
ec2api,
eksapi,
amiResolver,
securityGroupProvider,
subnetProvider,
lo.Must(GetCABundle(ctx, operator.GetConfig())),
operator.Elected(),
kubeDNSIP,
clusterEndpoint,
)
capacityReservationProvider := capacityreservation.NewProvider(
ec2api,
operator.Clock,
cache.New(awscache.DefaultTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.CapacityReservationAvailabilityTTL, awscache.DefaultCleanupInterval),
)
instanceTypeProvider := instancetype.NewDefaultProvider(
cache.New(awscache.InstanceTypesZonesAndOfferingsTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.InstanceTypesZonesAndOfferingsTTL, awscache.DefaultCleanupInterval),
cache.New(awscache.DiscoveredCapacityCacheTTL, awscache.DefaultCleanupInterval),
ec2api,
subnetProvider,
pricingProvider,
capacityReservationProvider,
unavailableOfferingsCache,
instancetype.NewDefaultResolver(cfg.Region),
)
instanceProvider := instance.NewDefaultProvider(
ctx,
cfg.Region,
operator.EventRecorder,
ec2api,
unavailableOfferingsCache,
subnetProvider,
launchTemplateProvider,
capacityReservationProvider,
)
// Setup field indexers on instanceID -- specifically for the interruption controller
if options.FromContext(ctx).InterruptionQueue != "" {
SetupIndexers(ctx, operator.Manager)
}
return ctx, &Operator{
Operator: operator,
Config: cfg,
UnavailableOfferingsCache: unavailableOfferingsCache,
SSMCache: ssmCache,
ValidationCache: validationCache,
SubnetProvider: subnetProvider,
SecurityGroupProvider: securityGroupProvider,
InstanceProfileProvider: instanceProfileProvider,
AMIProvider: amiProvider,
AMIResolver: amiResolver,
VersionProvider: versionProvider,
LaunchTemplateProvider: launchTemplateProvider,
PricingProvider: pricingProvider,
InstanceTypesProvider: instanceTypeProvider,
InstanceProvider: instanceProvider,
SSMProvider: ssmProvider,
CapacityReservationProvider: capacityReservationProvider,
EC2API: ec2api,
}
}