in pkg/provider/ip/provider.go [85:192]
func (p *ipv4Provider) InitResource(instance ec2.EC2Instance) error {
nodeName := instance.Name()
eniManager := eni.NewENIManager(instance)
ipV4Resources, err := eniManager.InitResources(p.apiWrapper.EC2API)
if err != nil || ipV4Resources == nil {
if errors.Is(err, utils.ErrNotFound) {
msg := fmt.Sprintf("The instance type %s is not supported for Windows", instance.Type())
utils.SendNodeEventWithNodeName(p.apiWrapper.K8sAPI, instance.Name(), utils.UnsupportedInstanceTypeReason, msg, v1.EventTypeWarning, p.log)
}
return err
}
presentIPs := ipV4Resources.PrivateIPv4Addresses
presentPrefixes := ipV4Resources.IPv4Prefixes
presentIPSet := map[string]struct{}{}
for _, ip := range presentIPs {
presentIPSet[ip] = struct{}{}
}
pods, err := p.apiWrapper.PodAPI.GetRunningPodsOnNode(nodeName)
if err != nil {
return err
}
podToResourceMap := map[string]pool.Resource{}
usedIPSet := map[string]struct{}{}
// Construct map of all possible IPs to prefix for each assigned prefix
ipToPrefixMap := make(map[string]string)
usedPrefixSet := make(map[string]struct{})
for _, prefix := range presentPrefixes {
prefixIPs, err := utils.DeconstructIPsFromPrefix(prefix)
if err != nil {
p.log.Error(err, "failed to deconstruct prefix into IPs", "prefix", prefix)
continue
}
for _, ip := range prefixIPs {
ipToPrefixMap[ip] = prefix
}
}
for _, pod := range pods {
annotation, present := pod.Annotations[config.ResourceNameIPAddress]
if !present {
continue
}
// Only mark pod as used if it's secondary IP
if _, found := presentIPSet[annotation]; found {
podToResourceMap[string(pod.UID)] = pool.Resource{GroupID: annotation, ResourceID: annotation}
usedIPSet[annotation] = struct{}{}
} else {
// If running pod's IP is not secondary IP, ignore it
p.log.Info("ignoring non-secondary IP", "IPv4 address ", annotation)
if prefix, exist := ipToPrefixMap[annotation]; exist {
usedPrefixSet[prefix] = struct{}{}
}
}
}
warmIPs := difference(presentIPs, usedIPSet)
warmResources := make(map[string][]pool.Resource, len(presentIPs))
for _, ip := range warmIPs {
warmResources[ip] = append(warmResources[ip], pool.Resource{GroupID: ip, ResourceID: ip})
}
// Expected node capacity based on instance type in secondary IP mode
nodeCapacity := getCapacity(instance.Type(), instance.Os())
isPDEnabled := p.conditions.IsWindowsPrefixDelegationEnabled()
p.config = pool.GetWinWarmPoolConfig(p.log, p.apiWrapper, isPDEnabled)
// Set warm pool config to empty config if PD is enabled
secondaryIPWPConfig := p.config
if isPDEnabled {
secondaryIPWPConfig = &config.WarmPoolConfig{}
} else {
// Log the discrepancy between the advertised and the actual node capacity when it is in secondary IP mode
if len(usedPrefixSet) > 0 {
actualCapacity := nodeCapacity - len(usedPrefixSet)
p.log.Info("there could be discrepancy between advertised and actual node capacity due to existing pods from "+
"prefix delegation mode", "node name", instance.Name(), "advertised capacity", nodeCapacity,
"actual capacity", actualCapacity)
}
}
resourcePool := pool.NewResourcePool(p.log.WithName("secondary ipv4 address resource pool").
WithValues("node name", instance.Name()), secondaryIPWPConfig, podToResourceMap,
warmResources, instance.Name(), nodeCapacity, false)
p.putInstanceProviderAndPool(nodeName, resourcePool, eniManager, nodeCapacity, isPDEnabled)
p.log.Info("initialized the resource provider for secondary ipv4 address",
"capacity", nodeCapacity, "node name", nodeName, "instance type",
instance.Type(), "instance ID", instance.InstanceID())
// Reconcile pool after starting up and submit the async job
job := resourcePool.ReconcilePool()
if job.Operations != worker.OperationReconcileNotRequired {
p.SubmitAsyncJob(job)
}
// Submit the async job to periodically process the delete queue
p.SubmitAsyncJob(worker.NewWarmProcessDeleteQueueJob(nodeName))
return nil
}