in pkg/openstack/loadbalancer.go [1775:1929]
func (lbaas *LbaasV2) ensureOctaviaLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) (lbs *corev1.LoadBalancerStatus, err error) {
svcConf := new(serviceConfig)
// Update the service annotations(e.g. add loadbalancer.openstack.org/load-balancer-id) in the end if it doesn't exist.
patcher := newServicePatcher(lbaas.kclient, service)
defer func() { err = patcher.Patch(ctx, err) }()
if err := lbaas.checkService(service, nodes, svcConf); err != nil {
return nil, err
}
// Use more meaningful name for the load balancer but still need to check the legacy name for backward compatibility.
lbName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
svcConf.lbName = lbName
serviceName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
var loadbalancer *loadbalancers.LoadBalancer
isLBOwner := false
createNewLB := false
// Check the load balancer in the Service annotation.
if svcConf.lbID != "" {
loadbalancer, err = openstackutil.GetLoadbalancerByID(lbaas.lb, svcConf.lbID)
if err != nil {
return nil, fmt.Errorf("failed to get load balancer %s: %v", svcConf.lbID, err)
}
// If this LB name matches the default generated name, the Service 'owns' the LB, but it's also possible for this
// LB to be shared by other Services.
// If the names don't match, this is a LB this Service wants to attach.
if loadbalancer.Name == lbName {
isLBOwner = true
}
// Shared LB can only be supported when the Tag feature is available in Octavia.
if !svcConf.supportLBTags && !isLBOwner {
return nil, fmt.Errorf("shared load balancer is only supported with the tag feature in the cloud load balancer service")
}
// The load balancer can only be shared with the configured number of Services.
if svcConf.supportLBTags {
sharedCount := 0
for _, tag := range loadbalancer.Tags {
if strings.HasPrefix(tag, servicePrefix) {
sharedCount++
}
}
if !isLBOwner && !cpoutil.Contains(loadbalancer.Tags, lbName) && sharedCount+1 > lbaas.opts.MaxSharedLB {
return nil, fmt.Errorf("load balancer %s already shared with %d Services", loadbalancer.ID, sharedCount)
}
}
} else {
legacyName := lbaas.getLoadBalancerLegacyName(ctx, clusterName, service)
loadbalancer, err = getLoadbalancerByName(lbaas.lb, lbName, legacyName)
if err != nil {
if err != cpoerrors.ErrNotFound {
return nil, fmt.Errorf("error getting loadbalancer for Service %s: %v", serviceName, err)
}
klog.InfoS("Creating fully populated loadbalancer", "lbName", lbName, "service", klog.KObj(service))
loadbalancer, err = lbaas.createFullyPopulatedOctaviaLoadBalancer(lbName, clusterName, service, nodes, svcConf)
if err != nil {
return nil, fmt.Errorf("error creating loadbalancer %s: %v", lbName, err)
}
createNewLB = true
} else {
// This is a Service created before shared LB is supported.
isLBOwner = true
}
}
if loadbalancer.ProvisioningStatus != activeStatus {
return nil, fmt.Errorf("load balancer %s is not ACTIVE, current provisioning status: %s", loadbalancer.ID, loadbalancer.ProvisioningStatus)
}
loadbalancer.Listeners, err = openstackutil.GetListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, err
}
klog.V(4).InfoS("Load balancer ensured", "lbID", loadbalancer.ID, "isLBOwner", isLBOwner, "createNewLB", createNewLB)
// This is an existing load balancer, either created by occm for other Services or by the user outside of cluster.
if !createNewLB {
curListeners := loadbalancer.Listeners
curListenerMapping := make(map[listenerKey]*listeners.Listener)
for i, l := range curListeners {
key := listenerKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
curListenerMapping[key] = &curListeners[i]
}
klog.V(4).InfoS("Existing listeners", "portProtocolMapping", curListenerMapping)
// Check port conflicts
if err := lbaas.checkListenerPorts(service, curListenerMapping, isLBOwner, lbName); err != nil {
return nil, err
}
for portIndex, port := range service.Spec.Ports {
listener, err := lbaas.ensureOctaviaListener(loadbalancer.ID, cutString(fmt.Sprintf("listener_%d_%s", portIndex, lbName)), curListenerMapping, port, svcConf, service)
if err != nil {
return nil, err
}
// After all ports have been processed, remaining listeners are removed if they were created by this Service.
curListeners = popListener(curListeners, listener.ID)
pool, err := lbaas.ensureOctaviaPool(loadbalancer.ID, cutString(fmt.Sprintf("pool_%d_%s", portIndex, lbName)), listener, service, port, nodes, svcConf)
if err != nil {
return nil, err
}
if err := lbaas.ensureOctaviaHealthMonitor(loadbalancer.ID, cutString(fmt.Sprintf("monitor_%d_%s", portIndex, lbName)), pool, port, svcConf); err != nil {
return nil, err
}
}
// Deal with the remaining listeners, delete the listener if it was created by this Service previously.
if err := lbaas.deleteOctaviaListeners(loadbalancer.ID, curListeners, isLBOwner, lbName); err != nil {
return nil, err
}
}
addr, err := lbaas.getServiceAddress(clusterName, service, loadbalancer, svcConf)
if err != nil {
return nil, err
}
// Add annotation to Service and add LB name to load balancer tags.
lbaas.updateServiceAnnotation(service, ServiceAnnotationLoadBalancerID, loadbalancer.ID)
if svcConf.supportLBTags {
lbTags := loadbalancer.Tags
if !cpoutil.Contains(lbTags, lbName) {
lbTags = append(lbTags, lbName)
klog.InfoS("Updating load balancer tags", "lbID", loadbalancer.ID, "tags", lbTags)
if err := openstackutil.UpdateLoadBalancerTags(lbaas.lb, loadbalancer.ID, lbTags); err != nil {
return nil, err
}
}
}
status := &corev1.LoadBalancerStatus{
Ingress: []corev1.LoadBalancerIngress{{IP: addr}},
}
// If the load balancer is using the PROXY protocol, expose its IP address via
// the Hostname field to prevent kube-proxy from injecting an iptables bypass.
// This is a workaround until
// https://github.com/kubernetes/enhancements/tree/master/keps/sig-network/1860-kube-proxy-IP-node-binding
// is implemented (maybe in v1.22).
if svcConf.enableProxyProtocol && lbaas.opts.EnableIngressHostname {
fakeHostname := fmt.Sprintf("%s.%s", status.Ingress[0].IP, lbaas.opts.IngressHostnameSuffix)
status.Ingress = []corev1.LoadBalancerIngress{{Hostname: fakeHostname}}
}
return status, nil
}