in pkg/providers/subnet/subnet.go [184:240]
func (p *DefaultProvider) UpdateInflightIPs(createFleetInput *ec2.CreateFleetInput, createFleetOutput *ec2.CreateFleetOutput, instanceTypes []*cloudprovider.InstanceType,
subnets []*Subnet, capacityType string) {
p.Lock()
defer p.Unlock()
// Process the CreateFleetInput to pull out all the requested subnetIDs
fleetInputSubnets := lo.Compact(lo.Uniq(lo.FlatMap(createFleetInput.LaunchTemplateConfigs, func(req ec2types.FleetLaunchTemplateConfigRequest, _ int) []string {
return lo.Map(req.Overrides, func(override ec2types.FleetLaunchTemplateOverridesRequest, _ int) string {
return lo.FromPtr(override.SubnetId)
})
})))
// Process the CreateFleetOutput to pull out all the fulfilled subnetIDs
var fleetOutputSubnets []string
if createFleetOutput != nil {
fleetOutputSubnets = lo.Compact(lo.Uniq(lo.Map(createFleetOutput.Instances, func(fleetInstance ec2types.CreateFleetInstance, _ int) string {
if fleetInstance.LaunchTemplateAndOverrides == nil || fleetInstance.LaunchTemplateAndOverrides.Overrides == nil {
return ""
}
return lo.FromPtr(fleetInstance.LaunchTemplateAndOverrides.Overrides.SubnetId)
})))
}
// Find the subnets that were included in the input but not chosen by Fleet, so we need to add the inflight IPs back to them
subnetIDsToAddBackIPs, _ := lo.Difference(fleetInputSubnets, fleetOutputSubnets)
// Aggregate all the cached subnets ip address count
cachedAvailableIPAddressMap := lo.MapEntries(p.availableIPAddressCache.Items(), func(k string, v cache.Item) (string, int32) {
return k, v.Object.(int32)
})
// Update the inflight IP tracking of subnets stored in the cache that have not be synchronized since the initial
// deduction of IP addresses before the instance launch
for cachedSubnetID, cachedIPAddressCount := range cachedAvailableIPAddressMap {
if !lo.Contains(subnetIDsToAddBackIPs, cachedSubnetID) {
continue
}
originalSubnet, ok := lo.Find(subnets, func(subnet *Subnet) bool {
return subnet.ID == cachedSubnetID
})
if !ok {
continue
}
// If the cached subnet IP address count hasn't changed from the original subnet used to
// launch the instance, then we need to update the tracked IPs
if originalSubnet.AvailableIPAddressCount == cachedIPAddressCount {
// other IPs deducted were opportunistic and need to be readded since Fleet didn't pick those subnets to launch into
if ips, ok := p.inflightIPs[originalSubnet.ID]; ok {
minPods := p.minPods(instanceTypes, scheduling.NewRequirements(
scheduling.NewRequirement(karpv1.CapacityTypeLabelKey, corev1.NodeSelectorOpIn, capacityType),
scheduling.NewRequirement(corev1.LabelTopologyZone, corev1.NodeSelectorOpIn, originalSubnet.Zone),
))
p.inflightIPs[originalSubnet.ID] = ips + minPods
}
}
}
}