func()

in pkg/openstack/loadbalancer.go [1775:1929]


func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) (lbs *corev1.LoadBalancerStatus, err error) {
	svcConf := new(serviceConfig)

	// Update the service annotations(e.g. add loadbalancer.openstack.org/load-balancer-id) in the end if it doesn't exist.
	patcher := newServicePatcher(lbaas.kclient, service)
	defer func() { err = patcher.Patch(ctx, err) }()

	if err := lbaas.checkService(service, nodes, svcConf); err != nil {
		return nil, err
	}

	// Use more meaningful name for the load balancer but still need to check the legacy name for backward compatibility.
	lbName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
	svcConf.lbName = lbName
	serviceName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
	var loadbalancer *loadbalancers.LoadBalancer
	isLBOwner := false
	createNewLB := false

	// Check the load balancer in the Service annotation.
	if svcConf.lbID != "" {
		loadbalancer, err = openstackutil.GetLoadbalancerByID(lbaas.lb, svcConf.lbID)
		if err != nil {
			return nil, fmt.Errorf("failed to get load balancer %s: %v", svcConf.lbID, err)
		}

		// If this LB name matches the default generated name, the Service 'owns' the LB, but it's also possible for this
		// LB to be shared by other Services.
		// If the names don't match, this is a LB this Service wants to attach.
		if loadbalancer.Name == lbName {
			isLBOwner = true
		}

		// Shared LB can only be supported when the Tag feature is available in Octavia.
		if !svcConf.supportLBTags && !isLBOwner {
			return nil, fmt.Errorf("shared load balancer is only supported with the tag feature in the cloud load balancer service")
		}

		// The load balancer can only be shared with the configured number of Services.
		if svcConf.supportLBTags {
			sharedCount := 0
			for _, tag := range loadbalancer.Tags {
				if strings.HasPrefix(tag, servicePrefix) {
					sharedCount++
				}
			}
			if !isLBOwner && !cpoutil.Contains(loadbalancer.Tags, lbName) && sharedCount+1 > lbaas.opts.MaxSharedLB {
				return nil, fmt.Errorf("load balancer %s already shared with %d Services", loadbalancer.ID, sharedCount)
			}
		}
	} else {
		legacyName := lbaas.getLoadBalancerLegacyName(ctx, clusterName, service)
		loadbalancer, err = getLoadbalancerByName(lbaas.lb, lbName, legacyName)
		if err != nil {
			if err != cpoerrors.ErrNotFound {
				return nil, fmt.Errorf("error getting loadbalancer for Service %s: %v", serviceName, err)
			}

			klog.InfoS("Creating fully populated loadbalancer", "lbName", lbName, "service", klog.KObj(service))
			loadbalancer, err = lbaas.createFullyPopulatedOctaviaLoadBalancer(lbName, clusterName, service, nodes, svcConf)
			if err != nil {
				return nil, fmt.Errorf("error creating loadbalancer %s: %v", lbName, err)
			}
			createNewLB = true
		} else {
			// This is a Service created before shared LB is supported.
			isLBOwner = true
		}
	}

	if loadbalancer.ProvisioningStatus != activeStatus {
		return nil, fmt.Errorf("load balancer %s is not ACTIVE, current provisioning status: %s", loadbalancer.ID, loadbalancer.ProvisioningStatus)
	}

	loadbalancer.Listeners, err = openstackutil.GetListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
	if err != nil {
		return nil, err
	}

	klog.V(4).InfoS("Load balancer ensured", "lbID", loadbalancer.ID, "isLBOwner", isLBOwner, "createNewLB", createNewLB)

	// This is an existing load balancer, either created by occm for other Services or by the user outside of cluster.
	if !createNewLB {
		curListeners := loadbalancer.Listeners
		curListenerMapping := make(map[listenerKey]*listeners.Listener)
		for i, l := range curListeners {
			key := listenerKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
			curListenerMapping[key] = &curListeners[i]
		}
		klog.V(4).InfoS("Existing listeners", "portProtocolMapping", curListenerMapping)

		// Check port conflicts
		if err := lbaas.checkListenerPorts(service, curListenerMapping, isLBOwner, lbName); err != nil {
			return nil, err
		}

		for portIndex, port := range service.Spec.Ports {
			listener, err := lbaas.ensureOctaviaListener(loadbalancer.ID, cutString(fmt.Sprintf("listener_%d_%s", portIndex, lbName)), curListenerMapping, port, svcConf, service)
			if err != nil {
				return nil, err
			}

			// After all ports have been processed, remaining listeners are removed if they were created by this Service.
			curListeners = popListener(curListeners, listener.ID)

			pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cutString(fmt.Sprintf("pool_%d_%s", portIndex, lbName)), listener, service, port, nodes, svcConf)
			if err != nil {
				return nil, err
			}

			if err := lbaas.ensureOctaviaHealthMonitor(loadbalancer.ID, cutString(fmt.Sprintf("monitor_%d_%s", portIndex, lbName)), pool, port, svcConf); err != nil {
				return nil, err
			}
		}

		// Deal with the remaining listeners, delete the listener if it was created by this Service previously.
		if err := lbaas.deleteOctaviaListeners(loadbalancer.ID, curListeners, isLBOwner, lbName); err != nil {
			return nil, err
		}
	}

	addr, err := lbaas.getServiceAddress(clusterName, service, loadbalancer, svcConf)
	if err != nil {
		return nil, err
	}

	// Add annotation to Service and add LB name to load balancer tags.
	lbaas.updateServiceAnnotation(service, ServiceAnnotationLoadBalancerID, loadbalancer.ID)
	if svcConf.supportLBTags {
		lbTags := loadbalancer.Tags
		if !cpoutil.Contains(lbTags, lbName) {
			lbTags = append(lbTags, lbName)
			klog.InfoS("Updating load balancer tags", "lbID", loadbalancer.ID, "tags", lbTags)
			if err := openstackutil.UpdateLoadBalancerTags(lbaas.lb, loadbalancer.ID, lbTags); err != nil {
				return nil, err
			}
		}
	}

	status := &corev1.LoadBalancerStatus{
		Ingress: []corev1.LoadBalancerIngress{{IP: addr}},
	}

	// If the load balancer is using the PROXY protocol, expose its IP address via
	// the Hostname field to prevent kube-proxy from injecting an iptables bypass.
	// This is a workaround until
	// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1860-kube-proxy-IP-node-binding
	// is implemented (maybe in v1.22).
	if svcConf.enableProxyProtocol && lbaas.opts.EnableIngressHostname {
		fakeHostname := fmt.Sprintf("%s.%s", status.Ingress[0].IP, lbaas.opts.IngressHostnameSuffix)
		status.Ingress = []corev1.LoadBalancerIngress{{Hostname: fakeHostname}}
	}

	return status, nil
}