in pkg/openstack/loadbalancer.go [1938:2414]
func (lbaas *LbaasV2) ensureLoadBalancer(ctx context.Context, clusterName string, apiService *corev1.Service, nodes []*corev1.Node) (*corev1.LoadBalancerStatus, error) {
serviceName := fmt.Sprintf("%s/%s", apiService.Namespace, apiService.Name)
klog.InfoS("EnsureLoadBalancer", "cluster", clusterName, "service", klog.KObj(apiService))
if lbaas.opts.UseOctavia {
return lbaas.ensureOctaviaLoadBalancer(ctx, clusterName, apiService, nodes)
}
// Following code is just for legacy Neutron-LBaaS support which has been deprecated since OpenStack stable/queens
// and not recommended using in production. No new features should be added.
if len(nodes) == 0 {
return nil, fmt.Errorf("there are no available nodes for LoadBalancer service %s", serviceName)
}
lbaas.opts.NetworkID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerNetworkID, lbaas.opts.NetworkID)
lbaas.opts.SubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
if len(lbaas.opts.SubnetID) == 0 && len(lbaas.opts.NetworkID) == 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
if err != nil {
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
return nil, fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
"and failed to find subnet-id from OpenStack: %v", apiService.Namespace, apiService.Name, err)
}
lbaas.opts.SubnetID = subnetID
}
ports := apiService.Spec.Ports
if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to openstack load balancer")
}
internalAnnotation := getBoolFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerInternal, lbaas.opts.InternalLB)
var lbClass *LBClass
var floatingNetworkID string
var floatingSubnetID string
if !internalAnnotation {
klog.V(4).Infof("Ensure an external loadbalancer service")
class := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerClass, "")
if class != "" {
lbClass = lbaas.opts.LBClasses[class]
if lbClass == nil {
return nil, fmt.Errorf("invalid loadbalancer class %q", class)
}
klog.V(4).Infof("found loadbalancer class %q with %+v", class, lbClass)
// read floating network id and floating subnet id from loadbalancer class
if lbClass.FloatingNetworkID != "" {
floatingNetworkID = lbClass.FloatingNetworkID
}
if lbClass.FloatingSubnetID != "" {
floatingSubnetID = lbClass.FloatingSubnetID
}
}
if floatingNetworkID == "" {
floatingNetworkID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkID, lbaas.opts.FloatingNetworkID)
if floatingNetworkID == "" {
var err error
floatingNetworkID, err = openstackutil.GetFloatingNetworkID(lbaas.network)
if err != nil {
klog.Warningf("Failed to find floating-network-id for Service %s: %v", serviceName, err)
}
}
}
if floatingSubnetID == "" {
floatingSubnetName := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingSubnet, "")
if floatingSubnetName != "" {
lbSubnet, err := lbaas.getSubnet(floatingSubnetName)
if err != nil || lbSubnet == nil {
klog.Warningf("Failed to find floating-subnet-id for Service %s: %v", serviceName, err)
} else {
floatingSubnetID = lbSubnet.ID
}
}
if floatingSubnetID == "" {
floatingSubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingSubnetID, lbaas.opts.FloatingSubnetID)
}
}
// check subnets belongs to network
if floatingNetworkID != "" && floatingSubnetID != "" {
mc := metrics.NewMetricContext("subnet", "get")
subnet, err := subnets.Get(lbaas.network, floatingSubnetID).Extract()
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("failed to find subnet %q: %v", floatingSubnetID, err)
}
if subnet.NetworkID != floatingNetworkID {
return nil, fmt.Errorf("FloatingSubnet %q doesn't belong to FloatingNetwork %q", floatingSubnetID, floatingSubnetID)
}
}
} else {
klog.V(4).Infof("Ensure an internal loadbalancer service.")
}
// Check for TCP protocol on each port
for _, port := range ports {
if port.Protocol != corev1.ProtocolTCP {
return nil, fmt.Errorf("only TCP LoadBalancer is supported for openstack load balancers")
}
}
sourceRanges, err := GetLoadBalancerSourceRanges(apiService)
if err != nil {
return nil, fmt.Errorf("failed to get source ranges for loadbalancer service %s: %v", serviceName, err)
}
if !IsAllowAll(sourceRanges) && !lbaas.opts.ManageSecurityGroups {
return nil, fmt.Errorf("source range restrictions are not supported for openstack load balancers without managing security groups")
}
affinity := apiService.Spec.SessionAffinity
var persistence *v2pools.SessionPersistence
switch affinity {
case corev1.ServiceAffinityNone:
persistence = nil
case corev1.ServiceAffinityClientIP:
persistence = &v2pools.SessionPersistence{Type: "SOURCE_IP"}
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
}
// Use more meaningful name for the load balancer but still need to check the legacy name for backward compatibility.
name := lbaas.GetLoadBalancerName(ctx, clusterName, apiService)
legacyName := lbaas.getLoadBalancerLegacyName(ctx, clusterName, apiService)
loadbalancer, err := getLoadbalancerByName(lbaas.lb, name, legacyName)
if err != nil {
if err != cpoerrors.ErrNotFound {
return nil, fmt.Errorf("error getting loadbalancer for Service %s: %v", serviceName, err)
}
klog.V(2).Infof("Creating loadbalancer %s", name)
portID := ""
if lbClass == nil {
portID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerPortID, "")
}
loadbalancer, err = lbaas.createLoadBalancer(apiService, name, clusterName, lbClass, internalAnnotation, portID)
if err != nil {
return nil, fmt.Errorf("error creating loadbalancer %s: %v", name, err)
}
} else {
klog.V(2).Infof("LoadBalancer %s(%s) already exists", loadbalancer.Name, loadbalancer.ID)
}
if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
return nil, err
}
oldListeners, err := openstackutil.GetListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return nil, fmt.Errorf("error getting LB %s listeners: %v", loadbalancer.Name, err)
}
curListenerMapping := make(map[listenerKey]*listeners.Listener)
for i, l := range oldListeners {
key := listenerKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
curListenerMapping[key] = &oldListeners[i]
}
for portIndex, port := range ports {
key := listenerKey{Protocol: listeners.Protocol(port.Protocol), Port: int(port.Port)}
listener, _ := curListenerMapping[key]
climit := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerConnLimit, "-1")
connLimit := -1
tmp, err := strconv.Atoi(climit)
if err != nil {
klog.V(4).Infof("Could not parse int value from \"%s\" error \"%v\" failing back to default", climit, err)
} else {
connLimit = tmp
}
if listener == nil {
listenerProtocol := listeners.Protocol(port.Protocol)
listenerCreateOpt := listeners.CreateOpts{
Name: cutString(fmt.Sprintf("listener_%d_%s", portIndex, name)),
Protocol: listenerProtocol,
ProtocolPort: int(port.Port),
ConnLimit: &connLimit,
LoadbalancerID: loadbalancer.ID,
}
klog.V(4).Infof("Creating listener for port %d using protocol: %s", int(port.Port), listenerProtocol)
listener, err = openstackutil.CreateListener(lbaas.lb, loadbalancer.ID, listenerCreateOpt)
if err != nil {
return nil, fmt.Errorf("failed to create listener for loadbalancer %s: %v", loadbalancer.ID, err)
}
klog.V(4).Infof("Listener %s created for loadbalancer %s", listener.ID, loadbalancer.ID)
} else {
listenerChanged := false
updateOpts := listeners.UpdateOpts{}
if connLimit != listener.ConnLimit {
updateOpts.ConnLimit = &connLimit
listenerChanged = true
}
if listenerChanged {
if err := openstackutil.UpdateListener(lbaas.lb, loadbalancer.ID, listener.ID, updateOpts); err != nil {
return nil, fmt.Errorf("failed to update listener %s of loadbalancer %s: %v", listener.ID, loadbalancer.ID, err)
}
klog.V(4).Infof("Listener %s updated for loadbalancer %s", listener.ID, loadbalancer.ID)
}
}
// After all ports have been processed, remaining listeners are removed as obsolete.
// Pop valid listeners.
oldListeners = popListener(oldListeners, listener.ID)
pool, err := openstackutil.GetPoolByListener(lbaas.lb, loadbalancer.ID, listener.ID)
if err != nil && err != openstackutil.ErrNotFound {
return nil, fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
}
if pool == nil {
// Use the protocol of the listener
poolProto := v2pools.Protocol(listener.Protocol)
lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
createOpt := v2pools.CreateOpts{
Name: cutString(fmt.Sprintf("pool_%d_%s", portIndex, name)),
Protocol: poolProto,
LBMethod: lbmethod,
ListenerID: listener.ID,
Persistence: persistence,
}
klog.Infof("Creating pool for listener %s using protocol %s", listener.ID, poolProto)
pool, err = openstackutil.CreatePool(lbaas.lb, createOpt, loadbalancer.ID)
if err != nil {
return nil, err
}
}
klog.V(4).Infof("Pool created for listener %s: %s", listener.ID, pool.ID)
members, err := openstackutil.GetMembersbyPool(lbaas.lb, pool.ID)
if err != nil && !cpoerrors.IsNotFound(err) {
return nil, fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
}
for _, node := range nodes {
addr, err := nodeAddressForLB(node)
if err != nil {
if err == cpoerrors.ErrNotFound {
// Node failure, do not create member
klog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
continue
} else {
return nil, fmt.Errorf("error getting address for node %s: %v", node.Name, err)
}
}
if !memberExists(members, addr, int(port.NodePort)) {
klog.V(4).Infof("Creating member for pool %s", pool.ID)
mc := metrics.NewMetricContext("loadbalancer_member", "create")
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
Name: cutString(fmt.Sprintf("member_%d_%s_%s", portIndex, node.Name, name)),
ProtocolPort: int(port.NodePort),
Address: addr,
SubnetID: lbaas.opts.SubnetID,
}).Extract()
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("error creating LB pool member for node: %s, %v", node.Name, err)
}
if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
return nil, err
}
} else {
// After all members have been processed, remaining members are deleted as obsolete.
members = popMember(members, addr, int(port.NodePort))
}
klog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
}
// Delete obsolete members for this pool
for _, member := range members {
klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
mc := metrics.NewMetricContext("loadbalancer_member", "delete")
err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !cpoerrors.IsNotFound(err) {
_ = mc.ObserveRequest(err)
return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
}
_ = mc.ObserveRequest(nil)
if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
return nil, err
}
}
monitorID := pool.MonitorID
enableHealthMonitor := getBoolFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerEnableHealthMonitor, lbaas.opts.CreateMonitor)
if monitorID == "" && enableHealthMonitor {
klog.Infof("Creating monitor for pool %s", pool.ID)
monitorProtocol := string(port.Protocol)
if port.Protocol == corev1.ProtocolUDP {
monitorProtocol = "UDP-CONNECT"
}
createOpts := v2monitors.CreateOpts{
Name: cutString(fmt.Sprintf("monitor_%d_%s)", portIndex, name)),
PoolID: pool.ID,
Type: monitorProtocol,
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
}
monitor, err := openstackutil.CreateHealthMonitor(lbaas.lb, createOpts, loadbalancer.ID)
if err != nil {
return nil, err
}
monitorID = monitor.ID
} else if monitorID != "" && !enableHealthMonitor {
klog.Infof("Deleting health monitor %s for pool %s", monitorID, pool.ID)
mc := metrics.NewMetricContext("loadbalancer_healthmonitor", "delete")
err := v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("failed to delete health monitor %s for pool %s, error: %v", monitorID, pool.ID, err)
}
}
}
// All remaining listeners are obsolete, delete
for _, listener := range oldListeners {
klog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
// get pool for listener
pool, err := openstackutil.GetPoolByListener(lbaas.lb, loadbalancer.ID, listener.ID)
if err != nil && err != openstackutil.ErrNotFound {
return nil, fmt.Errorf("error getting pool for obsolete listener %s: %v", listener.ID, err)
}
if pool != nil {
// get and delete monitor
monitorID := pool.MonitorID
if monitorID != "" {
klog.Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
if err := openstackutil.DeleteHealthMonitor(lbaas.lb, monitorID, loadbalancer.ID); err != nil {
return nil, err
}
}
// get and delete pool members
members, err := openstackutil.GetMembersbyPool(lbaas.lb, pool.ID)
if err != nil && !cpoerrors.IsNotFound(err) {
return nil, fmt.Errorf("error getting members for pool %s: %v", pool.ID, err)
}
if members != nil {
for _, member := range members {
klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
mc := metrics.NewMetricContext("loadbalancer_member", "delete")
err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !cpoerrors.IsNotFound(err) {
_ = mc.ObserveRequest(err)
return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
}
_ = mc.ObserveRequest(nil)
if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
return nil, err
}
}
}
klog.Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
// delete pool
if err := openstackutil.DeletePool(lbaas.lb, pool.ID, loadbalancer.ID); err != nil {
return nil, err
}
}
// delete listener
if err := openstackutil.DeleteListener(lbaas.lb, listener.ID, loadbalancer.ID); err != nil {
return nil, err
}
klog.Infof("Deleted obsolete listener: %s", listener.ID)
}
// Priority of choosing VIP port floating IP:
// 1. The floating IP that is already attached to the VIP port.
// 2. Floating IP specified in Spec.LoadBalancerIP
// 3. Create a new one
var floatIP *floatingips.FloatingIP
if !internalAnnotation {
// first attempt: fetch floating IP attached to load balancer's VIP port
portID := loadbalancer.VipPortID
floatIP, err = openstackutil.GetFloatingIPByPortID(lbaas.network, portID)
if err != nil {
return nil, fmt.Errorf("failed when getting floating IP for port %s: %v", portID, err)
}
klog.V(4).Infof("Found floating IP %q by loadbalancer port ID %q", floatIP, portID)
// second attempt: fetch floating IP specified in service Spec.LoadBalancerIP
// if found, associate floating IP with loadbalancer's VIP port
loadBalancerIP := apiService.Spec.LoadBalancerIP
if floatIP == nil && loadBalancerIP != "" {
opts := floatingips.ListOpts{
FloatingIP: loadBalancerIP,
}
existingIPs, err := openstackutil.GetFloatingIPs(lbaas.network, opts)
if err != nil {
return nil, fmt.Errorf("failed when trying to get existing floating IP %s, error: %v", loadBalancerIP, err)
}
klog.V(4).Infof("Found floating IPs %v by loadbalancer IP %q", existingIPs, loadBalancerIP)
if len(existingIPs) > 0 {
floatingip := existingIPs[0]
if len(floatingip.PortID) == 0 {
floatUpdateOpts := floatingips.UpdateOpts{
PortID: &portID,
}
klog.V(4).Infof("Attaching floating IP %q to loadbalancer port %q", floatingip.FloatingIP, portID)
mc := metrics.NewMetricContext("floating_ip", "update")
floatIP, err = floatingips.Update(lbaas.network, floatingip.ID, floatUpdateOpts).Extract()
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("error updating LB floating IP %+v: %v", floatUpdateOpts, err)
}
} else {
return nil, fmt.Errorf("floating IP %s is not available", loadBalancerIP)
}
}
}
// third attempt: create a new floating IP
if floatIP == nil {
if floatingNetworkID != "" {
klog.V(4).Infof("Creating floating IP %s for loadbalancer %s", loadBalancerIP, loadbalancer.ID)
floatIPOpts := floatingips.CreateOpts{
FloatingNetworkID: floatingNetworkID,
PortID: portID,
Description: fmt.Sprintf("Floating IP for Kubernetes external service %s from cluster %s", serviceName, clusterName),
}
if floatingSubnetID != "" {
floatIPOpts.SubnetID = floatingSubnetID
}
if loadBalancerIP != "" {
floatIPOpts.FloatingIP = loadBalancerIP
}
klog.V(4).Infof("creating floating IP with opts %+v", floatIPOpts)
mc := metrics.NewMetricContext("floating_ip", "create")
floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
if mc.ObserveRequest(err) != nil {
return nil, fmt.Errorf("error creating LB floating IP %+v: %v", floatIPOpts, err)
}
} else {
klog.Warningf("Failed to find floating network information, for Service %s,"+
"forcing to ensure an internal load balancer service", serviceName)
}
}
}
status := &corev1.LoadBalancerStatus{}
if floatIP != nil {
status.Ingress = []corev1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
} else {
status.Ingress = []corev1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
}
if lbaas.opts.ManageSecurityGroups {
err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
if err != nil {
return status, fmt.Errorf("failed when reconciling security groups for LB service %v/%v: %v", apiService.Namespace, apiService.Name, err)
}
}
return status, nil
}