in pkg/provider/prefix/provider.go [87:193]
func (p *ipv4PrefixProvider) InitResource(instance ec2.EC2Instance) error {
nodeName := instance.Name()
eniManager := eni.NewENIManager(instance)
ipV4Resources, err := eniManager.InitResources(p.apiWrapper.EC2API)
if err != nil || ipV4Resources == nil {
if errors.Is(err, utils.ErrNotFound) {
msg := fmt.Sprintf("The instance type %s is not supported for Windows", instance.Type())
utils.SendNodeEventWithNodeName(p.apiWrapper.K8sAPI, instance.Name(), utils.UnsupportedInstanceTypeReason, msg, v1.EventTypeWarning, p.log)
}
return err
}
presentPrefixes := ipV4Resources.IPv4Prefixes
presentSecondaryIPSet := make(map[string]struct{})
for _, ip := range ipV4Resources.PrivateIPv4Addresses {
presentSecondaryIPSet[ip] = struct{}{}
}
pods, err := p.apiWrapper.PodAPI.GetRunningPodsOnNode(nodeName)
if err != nil {
return err
}
// Construct map of all possible IPs to prefix for each assigned prefix
warmResourceIDToGroup := map[string]string{}
for _, prefix := range presentPrefixes {
ips, err := utils.DeconstructIPsFromPrefix(prefix)
if err != nil {
return err
}
for _, ip := range ips {
warmResourceIDToGroup[ip] = prefix
}
}
podToResourceMap := make(map[string]pool.Resource)
numberUsedSecondaryIP := 0
for _, pod := range pods {
annotation, present := pod.Annotations[config.ResourceNameIPAddress]
if !present {
continue
}
prefix, found := warmResourceIDToGroup[annotation]
if found {
// store running pod into map of used resources
podToResourceMap[string(pod.UID)] = pool.Resource{GroupID: prefix, ResourceID: annotation}
// remove running pod's IP from warm resources
delete(warmResourceIDToGroup, annotation)
} else {
// If running pod's IP is not deconstructed from an assigned prefix on the instance, ignore it
p.log.Info("ignoring non-prefix deconstructed IP", "IPv4 address ", annotation)
if _, exist := presentSecondaryIPSet[annotation]; exist {
numberUsedSecondaryIP++
}
}
}
// Construct map of warm Resources, key is prefix, value is list of Resources belonged to that prefix
warmResources := make(map[string][]pool.Resource)
for ip, prefix := range warmResourceIDToGroup {
warmResources[prefix] = append(warmResources[prefix], pool.Resource{GroupID: prefix, ResourceID: ip})
}
// Expected node capacity based on instance type in PD mode
nodeCapacity := getCapacity(instance.Type(), instance.Os()) * pool.NumIPv4AddrPerPrefix
isPDEnabled := p.conditions.IsWindowsPrefixDelegationEnabled()
p.config = pool.GetWinWarmPoolConfig(p.log, p.apiWrapper, isPDEnabled)
// Set warm pool config to empty if PD is not enabled
prefixIPWPConfig := p.config
if !isPDEnabled {
prefixIPWPConfig = &config.WarmPoolConfig{}
} else {
// Log the discrepancy between the advertised and the actual node capacity when it is in PD mode
if numberUsedSecondaryIP > 0 {
actualCapacity := (getCapacity(instance.Type(), instance.Os()) - numberUsedSecondaryIP) * pool.NumIPv4AddrPerPrefix
p.log.Info("there could be discrepancy between advertised and actual node capacity due to existing pods from "+
"secondary IP mode", "node name", instance.Name(), "advertised capacity", nodeCapacity,
"actual capacity", actualCapacity)
}
}
resourcePool := pool.NewResourcePool(p.log.WithName("prefix ipv4 address resource pool").
WithValues("node name", instance.Name()), prefixIPWPConfig, podToResourceMap,
warmResources, instance.Name(), nodeCapacity, true)
p.putInstanceProviderAndPool(nodeName, resourcePool, eniManager, nodeCapacity, isPDEnabled)
p.log.Info("initialized the resource provider for ipv4 prefix",
"capacity", nodeCapacity, "node name", nodeName, "instance type",
instance.Type(), "instance ID", instance.InstanceID(), "warmPoolConfig", p.config)
job := resourcePool.ReconcilePool()
if job.Operations != worker.OperationReconcileNotRequired {
p.SubmitAsyncJob(job)
}
// Submit the async job to periodically process the delete queue
p.SubmitAsyncJob(worker.NewWarmProcessDeleteQueueJob(nodeName))
return nil
}