func()

in pkg/provider/ip/provider.go [85:192]


func (p *ipv4Provider) InitResource(instance ec2.EC2Instance) error {
	nodeName := instance.Name()

	eniManager := eni.NewENIManager(instance)
	ipV4Resources, err := eniManager.InitResources(p.apiWrapper.EC2API)
	if err != nil || ipV4Resources == nil {
		if errors.Is(err, utils.ErrNotFound) {
			msg := fmt.Sprintf("The instance type %s is not supported for Windows", instance.Type())
			utils.SendNodeEventWithNodeName(p.apiWrapper.K8sAPI, instance.Name(), utils.UnsupportedInstanceTypeReason, msg, v1.EventTypeWarning, p.log)
		}

		return err
	}

	presentIPs := ipV4Resources.PrivateIPv4Addresses
	presentPrefixes := ipV4Resources.IPv4Prefixes

	presentIPSet := map[string]struct{}{}
	for _, ip := range presentIPs {
		presentIPSet[ip] = struct{}{}
	}

	pods, err := p.apiWrapper.PodAPI.GetRunningPodsOnNode(nodeName)
	if err != nil {
		return err
	}

	podToResourceMap := map[string]pool.Resource{}
	usedIPSet := map[string]struct{}{}

	// Construct map of all possible IPs to prefix for each assigned prefix
	ipToPrefixMap := make(map[string]string)
	usedPrefixSet := make(map[string]struct{})
	for _, prefix := range presentPrefixes {
		prefixIPs, err := utils.DeconstructIPsFromPrefix(prefix)
		if err != nil {
			p.log.Error(err, "failed to deconstruct prefix into IPs", "prefix", prefix)
			continue
		}
		for _, ip := range prefixIPs {
			ipToPrefixMap[ip] = prefix
		}
	}

	for _, pod := range pods {
		annotation, present := pod.Annotations[config.ResourceNameIPAddress]
		if !present {
			continue
		}
		// Only mark pod as used if it's secondary IP
		if _, found := presentIPSet[annotation]; found {
			podToResourceMap[string(pod.UID)] = pool.Resource{GroupID: annotation, ResourceID: annotation}
			usedIPSet[annotation] = struct{}{}
		} else {
			// If running pod's IP is not secondary IP, ignore it
			p.log.Info("ignoring non-secondary IP", "IPv4 address ", annotation)
			if prefix, exist := ipToPrefixMap[annotation]; exist {
				usedPrefixSet[prefix] = struct{}{}
			}
		}
	}

	warmIPs := difference(presentIPs, usedIPSet)
	warmResources := make(map[string][]pool.Resource, len(presentIPs))
	for _, ip := range warmIPs {
		warmResources[ip] = append(warmResources[ip], pool.Resource{GroupID: ip, ResourceID: ip})
	}

	// Expected node capacity based on instance type in secondary IP mode
	nodeCapacity := getCapacity(instance.Type(), instance.Os())

	isPDEnabled := p.conditions.IsWindowsPrefixDelegationEnabled()
	p.config = pool.GetWinWarmPoolConfig(p.log, p.apiWrapper, isPDEnabled)

	// Set warm pool config to empty config if PD is enabled
	secondaryIPWPConfig := p.config
	if isPDEnabled {
		secondaryIPWPConfig = &config.WarmPoolConfig{}
	} else {
		// Log the discrepancy between the advertised and the actual node capacity when it is in secondary IP mode
		if len(usedPrefixSet) > 0 {
			actualCapacity := nodeCapacity - len(usedPrefixSet)
			p.log.Info("there could be discrepancy between advertised and actual node capacity due to existing pods from "+
				"prefix delegation mode", "node name", instance.Name(), "advertised capacity", nodeCapacity,
				"actual capacity", actualCapacity)
		}
	}

	resourcePool := pool.NewResourcePool(p.log.WithName("secondary ipv4 address resource pool").
		WithValues("node name", instance.Name()), secondaryIPWPConfig, podToResourceMap,
		warmResources, instance.Name(), nodeCapacity, false)

	p.putInstanceProviderAndPool(nodeName, resourcePool, eniManager, nodeCapacity, isPDEnabled)

	p.log.Info("initialized the resource provider for secondary ipv4 address",
		"capacity", nodeCapacity, "node name", nodeName, "instance type",
		instance.Type(), "instance ID", instance.InstanceID())

	// Reconcile pool after starting up and submit the async job
	job := resourcePool.ReconcilePool()
	if job.Operations != worker.OperationReconcileNotRequired {
		p.SubmitAsyncJob(job)
	}

	// Submit the async job to periodically process the delete queue
	p.SubmitAsyncJob(worker.NewWarmProcessDeleteQueueJob(nodeName))
	return nil
}