func()

in pkg/openstack/loadbalancer.go [1938:2414]


func (lbaas *LbaasV2) ensureLoadBalancer(ctx context.Context, clusterName string, apiService *corev1.Service, nodes []*corev1.Node) (*corev1.LoadBalancerStatus, error) {
	serviceName := fmt.Sprintf("%s/%s", apiService.Namespace, apiService.Name)
	klog.InfoS("EnsureLoadBalancer", "cluster", clusterName, "service", klog.KObj(apiService))

	if lbaas.opts.UseOctavia {
		return lbaas.ensureOctaviaLoadBalancer(ctx, clusterName, apiService, nodes)
	}

	// Following code is just for legacy Neutron-LBaaS support which has been deprecated since OpenStack stable/queens
	// and not recommended using in production. No new features should be added.

	if len(nodes) == 0 {
		return nil, fmt.Errorf("there are no available nodes for LoadBalancer service %s", serviceName)
	}

	lbaas.opts.NetworkID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerNetworkID, lbaas.opts.NetworkID)
	lbaas.opts.SubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
	if len(lbaas.opts.SubnetID) == 0 && len(lbaas.opts.NetworkID) == 0 {
		// Get SubnetID automatically.
		// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
		subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
		if err != nil {
			klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
			return nil, fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
				"and failed to find subnet-id from OpenStack: %v", apiService.Namespace, apiService.Name, err)
		}
		lbaas.opts.SubnetID = subnetID
	}

	ports := apiService.Spec.Ports
	if len(ports) == 0 {
		return nil, fmt.Errorf("no ports provided to openstack load balancer")
	}

	internalAnnotation := getBoolFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerInternal, lbaas.opts.InternalLB)

	var lbClass *LBClass
	var floatingNetworkID string
	var floatingSubnetID string
	if !internalAnnotation {
		klog.V(4).Infof("Ensure an external loadbalancer service")

		class := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerClass, "")
		if class != "" {
			lbClass = lbaas.opts.LBClasses[class]
			if lbClass == nil {
				return nil, fmt.Errorf("invalid loadbalancer class %q", class)
			}

			klog.V(4).Infof("found loadbalancer class %q with %+v", class, lbClass)

			// read floating network id and floating subnet id from loadbalancer class
			if lbClass.FloatingNetworkID != "" {
				floatingNetworkID = lbClass.FloatingNetworkID
			}

			if lbClass.FloatingSubnetID != "" {
				floatingSubnetID = lbClass.FloatingSubnetID
			}
		}

		if floatingNetworkID == "" {
			floatingNetworkID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkID, lbaas.opts.FloatingNetworkID)
			if floatingNetworkID == "" {
				var err error
				floatingNetworkID, err = openstackutil.GetFloatingNetworkID(lbaas.network)
				if err != nil {
					klog.Warningf("Failed to find floating-network-id for Service %s: %v", serviceName, err)
				}
			}
		}

		if floatingSubnetID == "" {
			floatingSubnetName := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingSubnet, "")
			if floatingSubnetName != "" {
				lbSubnet, err := lbaas.getSubnet(floatingSubnetName)
				if err != nil || lbSubnet == nil {
					klog.Warningf("Failed to find floating-subnet-id for Service %s: %v", serviceName, err)
				} else {
					floatingSubnetID = lbSubnet.ID
				}
			}

			if floatingSubnetID == "" {
				floatingSubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingSubnetID, lbaas.opts.FloatingSubnetID)
			}
		}

		// check subnets belongs to network
		if floatingNetworkID != "" && floatingSubnetID != "" {
			mc := metrics.NewMetricContext("subnet", "get")
			subnet, err := subnets.Get(lbaas.network, floatingSubnetID).Extract()
			if mc.ObserveRequest(err) != nil {
				return nil, fmt.Errorf("failed to find subnet %q: %v", floatingSubnetID, err)
			}

			if subnet.NetworkID != floatingNetworkID {
				return nil, fmt.Errorf("FloatingSubnet %q doesn't belong to FloatingNetwork %q", floatingSubnetID, floatingSubnetID)
			}
		}
	} else {
		klog.V(4).Infof("Ensure an internal loadbalancer service.")
	}

	// Check for TCP protocol on each port
	for _, port := range ports {
		if port.Protocol != corev1.ProtocolTCP {
			return nil, fmt.Errorf("only TCP LoadBalancer is supported for openstack load balancers")
		}
	}

	sourceRanges, err := GetLoadBalancerSourceRanges(apiService)
	if err != nil {
		return nil, fmt.Errorf("failed to get source ranges for loadbalancer service %s: %v", serviceName, err)
	}
	if !IsAllowAll(sourceRanges) && !lbaas.opts.ManageSecurityGroups {
		return nil, fmt.Errorf("source range restrictions are not supported for openstack load balancers without managing security groups")
	}

	affinity := apiService.Spec.SessionAffinity
	var persistence *v2pools.SessionPersistence
	switch affinity {
	case corev1.ServiceAffinityNone:
		persistence = nil
	case corev1.ServiceAffinityClientIP:
		persistence = &v2pools.SessionPersistence{Type: "SOURCE_IP"}
	default:
		return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
	}

	// Use more meaningful name for the load balancer but still need to check the legacy name for backward compatibility.
	name := lbaas.GetLoadBalancerName(ctx, clusterName, apiService)
	legacyName := lbaas.getLoadBalancerLegacyName(ctx, clusterName, apiService)
	loadbalancer, err := getLoadbalancerByName(lbaas.lb, name, legacyName)
	if err != nil {
		if err != cpoerrors.ErrNotFound {
			return nil, fmt.Errorf("error getting loadbalancer for Service %s: %v", serviceName, err)
		}

		klog.V(2).Infof("Creating loadbalancer %s", name)

		portID := ""
		if lbClass == nil {
			portID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerPortID, "")
		}
		loadbalancer, err = lbaas.createLoadBalancer(apiService, name, clusterName, lbClass, internalAnnotation, portID)
		if err != nil {
			return nil, fmt.Errorf("error creating loadbalancer %s: %v", name, err)
		}
	} else {
		klog.V(2).Infof("LoadBalancer %s(%s) already exists", loadbalancer.Name, loadbalancer.ID)
	}

	if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
		return nil, err
	}

	oldListeners, err := openstackutil.GetListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
	if err != nil {
		return nil, fmt.Errorf("error getting LB %s listeners: %v", loadbalancer.Name, err)
	}
	curListenerMapping := make(map[listenerKey]*listeners.Listener)
	for i, l := range oldListeners {
		key := listenerKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
		curListenerMapping[key] = &oldListeners[i]
	}
	for portIndex, port := range ports {
		key := listenerKey{Protocol: listeners.Protocol(port.Protocol), Port: int(port.Port)}
		listener, _ := curListenerMapping[key]
		climit := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerConnLimit, "-1")
		connLimit := -1
		tmp, err := strconv.Atoi(climit)
		if err != nil {
			klog.V(4).Infof("Could not parse int value from \"%s\" error \"%v\" failing back to default", climit, err)
		} else {
			connLimit = tmp
		}

		if listener == nil {
			listenerProtocol := listeners.Protocol(port.Protocol)
			listenerCreateOpt := listeners.CreateOpts{
				Name:           cutString(fmt.Sprintf("listener_%d_%s", portIndex, name)),
				Protocol:       listenerProtocol,
				ProtocolPort:   int(port.Port),
				ConnLimit:      &connLimit,
				LoadbalancerID: loadbalancer.ID,
			}

			klog.V(4).Infof("Creating listener for port %d using protocol: %s", int(port.Port), listenerProtocol)

			listener, err = openstackutil.CreateListener(lbaas.lb, loadbalancer.ID, listenerCreateOpt)
			if err != nil {
				return nil, fmt.Errorf("failed to create listener for loadbalancer %s: %v", loadbalancer.ID, err)
			}

			klog.V(4).Infof("Listener %s created for loadbalancer %s", listener.ID, loadbalancer.ID)
		} else {
			listenerChanged := false
			updateOpts := listeners.UpdateOpts{}

			if connLimit != listener.ConnLimit {
				updateOpts.ConnLimit = &connLimit
				listenerChanged = true
			}

			if listenerChanged {
				if err := openstackutil.UpdateListener(lbaas.lb, loadbalancer.ID, listener.ID, updateOpts); err != nil {
					return nil, fmt.Errorf("failed to update listener %s of loadbalancer %s: %v", listener.ID, loadbalancer.ID, err)
				}

				klog.V(4).Infof("Listener %s updated for loadbalancer %s", listener.ID, loadbalancer.ID)
			}
		}

		// After all ports have been processed, remaining listeners are removed as obsolete.
		// Pop valid listeners.
		oldListeners = popListener(oldListeners, listener.ID)

		pool, err := openstackutil.GetPoolByListener(lbaas.lb, loadbalancer.ID, listener.ID)
		if err != nil && err != openstackutil.ErrNotFound {
			return nil, fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
		}
		if pool == nil {
			// Use the protocol of the listener
			poolProto := v2pools.Protocol(listener.Protocol)

			lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
			createOpt := v2pools.CreateOpts{
				Name:        cutString(fmt.Sprintf("pool_%d_%s", portIndex, name)),
				Protocol:    poolProto,
				LBMethod:    lbmethod,
				ListenerID:  listener.ID,
				Persistence: persistence,
			}

			klog.Infof("Creating pool for listener %s using protocol %s", listener.ID, poolProto)
			pool, err = openstackutil.CreatePool(lbaas.lb, createOpt, loadbalancer.ID)
			if err != nil {
				return nil, err
			}
		}

		klog.V(4).Infof("Pool created for listener %s: %s", listener.ID, pool.ID)

		members, err := openstackutil.GetMembersbyPool(lbaas.lb, pool.ID)
		if err != nil && !cpoerrors.IsNotFound(err) {
			return nil, fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
		}
		for _, node := range nodes {
			addr, err := nodeAddressForLB(node)
			if err != nil {
				if err == cpoerrors.ErrNotFound {
					// Node failure, do not create member
					klog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
					continue
				} else {
					return nil, fmt.Errorf("error getting address for node %s: %v", node.Name, err)
				}
			}

			if !memberExists(members, addr, int(port.NodePort)) {
				klog.V(4).Infof("Creating member for pool %s", pool.ID)
				mc := metrics.NewMetricContext("loadbalancer_member", "create")
				_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
					Name:         cutString(fmt.Sprintf("member_%d_%s_%s", portIndex, node.Name, name)),
					ProtocolPort: int(port.NodePort),
					Address:      addr,
					SubnetID:     lbaas.opts.SubnetID,
				}).Extract()
				if mc.ObserveRequest(err) != nil {
					return nil, fmt.Errorf("error creating LB pool member for node: %s, %v", node.Name, err)
				}

				if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
					return nil, err
				}
			} else {
				// After all members have been processed, remaining members are deleted as obsolete.
				members = popMember(members, addr, int(port.NodePort))
			}

			klog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
		}

		// Delete obsolete members for this pool
		for _, member := range members {
			klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
			mc := metrics.NewMetricContext("loadbalancer_member", "delete")
			err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
			if err != nil && !cpoerrors.IsNotFound(err) {
				_ = mc.ObserveRequest(err)
				return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
			}
			_ = mc.ObserveRequest(nil)

			if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
				return nil, err
			}
		}

		monitorID := pool.MonitorID
		enableHealthMonitor := getBoolFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerEnableHealthMonitor, lbaas.opts.CreateMonitor)
		if monitorID == "" && enableHealthMonitor {
			klog.Infof("Creating monitor for pool %s", pool.ID)
			monitorProtocol := string(port.Protocol)
			if port.Protocol == corev1.ProtocolUDP {
				monitorProtocol = "UDP-CONNECT"
			}
			createOpts := v2monitors.CreateOpts{
				Name:       cutString(fmt.Sprintf("monitor_%d_%s)", portIndex, name)),
				PoolID:     pool.ID,
				Type:       monitorProtocol,
				Delay:      int(lbaas.opts.MonitorDelay.Duration.Seconds()),
				Timeout:    int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
				MaxRetries: int(lbaas.opts.MonitorMaxRetries),
			}
			monitor, err := openstackutil.CreateHealthMonitor(lbaas.lb, createOpts, loadbalancer.ID)
			if err != nil {
				return nil, err
			}
			monitorID = monitor.ID
		} else if monitorID != "" && !enableHealthMonitor {
			klog.Infof("Deleting health monitor %s for pool %s", monitorID, pool.ID)
			mc := metrics.NewMetricContext("loadbalancer_healthmonitor", "delete")
			err := v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
			if mc.ObserveRequest(err) != nil {
				return nil, fmt.Errorf("failed to delete health monitor %s for pool %s, error: %v", monitorID, pool.ID, err)
			}
		}
	}

	// All remaining listeners are obsolete, delete
	for _, listener := range oldListeners {
		klog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
		// get pool for listener
		pool, err := openstackutil.GetPoolByListener(lbaas.lb, loadbalancer.ID, listener.ID)
		if err != nil && err != openstackutil.ErrNotFound {
			return nil, fmt.Errorf("error getting pool for obsolete listener %s: %v", listener.ID, err)
		}
		if pool != nil {
			// get and delete monitor
			monitorID := pool.MonitorID
			if monitorID != "" {
				klog.Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
				if err := openstackutil.DeleteHealthMonitor(lbaas.lb, monitorID, loadbalancer.ID); err != nil {
					return nil, err
				}
			}
			// get and delete pool members
			members, err := openstackutil.GetMembersbyPool(lbaas.lb, pool.ID)
			if err != nil && !cpoerrors.IsNotFound(err) {
				return nil, fmt.Errorf("error getting members for pool %s: %v", pool.ID, err)
			}
			if members != nil {
				for _, member := range members {
					klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
					mc := metrics.NewMetricContext("loadbalancer_member", "delete")
					err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
					if err != nil && !cpoerrors.IsNotFound(err) {
						_ = mc.ObserveRequest(err)
						return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
					}
					_ = mc.ObserveRequest(nil)

					if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
						return nil, err
					}
				}
			}
			klog.Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
			// delete pool
			if err := openstackutil.DeletePool(lbaas.lb, pool.ID, loadbalancer.ID); err != nil {
				return nil, err
			}
		}
		// delete listener
		if err := openstackutil.DeleteListener(lbaas.lb, listener.ID, loadbalancer.ID); err != nil {
			return nil, err
		}
		klog.Infof("Deleted obsolete listener: %s", listener.ID)
	}

	// Priority of choosing VIP port floating IP:
	// 1. The floating IP that is already attached to the VIP port.
	// 2. Floating IP specified in Spec.LoadBalancerIP
	// 3. Create a new one
	var floatIP *floatingips.FloatingIP
	if !internalAnnotation {

		// first attempt: fetch floating IP attached to load balancer's VIP port
		portID := loadbalancer.VipPortID
		floatIP, err = openstackutil.GetFloatingIPByPortID(lbaas.network, portID)
		if err != nil {
			return nil, fmt.Errorf("failed when getting floating IP for port %s: %v", portID, err)
		}
		klog.V(4).Infof("Found floating IP %q by loadbalancer port ID %q", floatIP, portID)

		// second attempt: fetch floating IP specified in service Spec.LoadBalancerIP
		// if found, associate floating IP with loadbalancer's VIP port
		loadBalancerIP := apiService.Spec.LoadBalancerIP
		if floatIP == nil && loadBalancerIP != "" {

			opts := floatingips.ListOpts{
				FloatingIP: loadBalancerIP,
			}
			existingIPs, err := openstackutil.GetFloatingIPs(lbaas.network, opts)
			if err != nil {
				return nil, fmt.Errorf("failed when trying to get existing floating IP %s, error: %v", loadBalancerIP, err)
			}
			klog.V(4).Infof("Found floating IPs %v by loadbalancer IP %q", existingIPs, loadBalancerIP)

			if len(existingIPs) > 0 {
				floatingip := existingIPs[0]
				if len(floatingip.PortID) == 0 {
					floatUpdateOpts := floatingips.UpdateOpts{
						PortID: &portID,
					}
					klog.V(4).Infof("Attaching floating IP %q to loadbalancer port %q", floatingip.FloatingIP, portID)
					mc := metrics.NewMetricContext("floating_ip", "update")
					floatIP, err = floatingips.Update(lbaas.network, floatingip.ID, floatUpdateOpts).Extract()
					if mc.ObserveRequest(err) != nil {
						return nil, fmt.Errorf("error updating LB floating IP %+v: %v", floatUpdateOpts, err)
					}
				} else {
					return nil, fmt.Errorf("floating IP %s is not available", loadBalancerIP)
				}
			}
		}

		// third attempt: create a new floating IP
		if floatIP == nil {
			if floatingNetworkID != "" {
				klog.V(4).Infof("Creating floating IP %s for loadbalancer %s", loadBalancerIP, loadbalancer.ID)
				floatIPOpts := floatingips.CreateOpts{
					FloatingNetworkID: floatingNetworkID,
					PortID:            portID,
					Description:       fmt.Sprintf("Floating IP for Kubernetes external service %s from cluster %s", serviceName, clusterName),
				}

				if floatingSubnetID != "" {
					floatIPOpts.SubnetID = floatingSubnetID
				}

				if loadBalancerIP != "" {
					floatIPOpts.FloatingIP = loadBalancerIP
				}

				klog.V(4).Infof("creating floating IP with opts %+v", floatIPOpts)
				mc := metrics.NewMetricContext("floating_ip", "create")
				floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
				if mc.ObserveRequest(err) != nil {
					return nil, fmt.Errorf("error creating LB floating IP %+v: %v", floatIPOpts, err)
				}
			} else {
				klog.Warningf("Failed to find floating network information, for Service %s,"+
					"forcing to ensure an internal load balancer service", serviceName)
			}
		}
	}

	status := &corev1.LoadBalancerStatus{}

	if floatIP != nil {
		status.Ingress = []corev1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
	} else {
		status.Ingress = []corev1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
	}

	if lbaas.opts.ManageSecurityGroups {
		err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
		if err != nil {
			return status, fmt.Errorf("failed when reconciling security groups for LB service %v/%v: %v", apiService.Namespace, apiService.Name, err)
		}
	}

	return status, nil
}