in pkg/openstack/loadbalancer.go [2761:2913]
func (lbaas *LbaasV2) updateLoadBalancer(ctx context.Context, clusterName string, service *corev1.Service, nodes []*corev1.Node) error {
if lbaas.opts.UseOctavia {
return lbaas.updateOctaviaLoadBalancer(ctx, clusterName, service, nodes)
}
// Following code is just for legacy Neutron-LBaaS support which has been deprecated since OpenStack stable/queens
// and not recommended using in production. No new features should be added.
serviceName := fmt.Sprintf("%s/%s", service.Namespace, service.Name)
klog.V(4).Infof("UpdateLoadBalancer(%v, %s, %v)", clusterName, serviceName, nodes)
lbaas.opts.SubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
if len(lbaas.opts.SubnetID) == 0 && len(nodes) > 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
if err != nil {
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
return fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
"and failed to find subnet-id from OpenStack: %v", service.Namespace, service.Name, err)
}
lbaas.opts.SubnetID = subnetID
}
ports := service.Spec.Ports
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
name := lbaas.GetLoadBalancerName(ctx, clusterName, service)
legacyName := lbaas.getLoadBalancerLegacyName(ctx, clusterName, service)
loadbalancer, err := getLoadbalancerByName(lbaas.lb, name, legacyName)
if err != nil {
return err
}
if loadbalancer == nil {
return fmt.Errorf("loadbalancer does not exist for Service %s", serviceName)
}
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol listeners.Protocol
Port int
}
var listenerIDs []string
lbListeners := make(map[portKey]listeners.Listener)
allListeners, err := openstackutil.GetListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("error getting listeners for LB %s: %v", loadbalancer.ID, err)
}
for _, l := range allListeners {
key := portKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
lbListeners[key] = l
listenerIDs = append(listenerIDs, l.ID)
}
// Get all pools for this loadbalancer, by listener ID.
lbPools := make(map[string]v2pools.Pool)
for _, listenerID := range listenerIDs {
pool, err := openstackutil.GetPoolByListener(lbaas.lb, loadbalancer.ID, listenerID)
if err != nil {
return fmt.Errorf("error getting pool for listener %s: %v", listenerID, err)
}
lbPools[listenerID] = *pool
}
// Compose Set of member (addresses) that _should_ exist
addrs := make(map[string]*corev1.Node)
for _, node := range nodes {
addr, err := nodeAddressForLB(node)
if err != nil {
return err
}
addrs[addr] = node
}
// Check for adding/removing members associated with each port
for portIndex, port := range ports {
// Get listener associated with this port
listener, ok := lbListeners[portKey{
Protocol: getListenerProtocol(port.Protocol, nil),
Port: int(port.Port),
}]
if !ok {
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadbalancer.ID, port.Port, port.Protocol)
}
// Get pool associated with this listener
pool, ok := lbPools[listener.ID]
if !ok {
return fmt.Errorf("loadbalancer %s does not contain required pool for listener %s", loadbalancer.ID, listener.ID)
}
// Find existing pool members (by address) for this port
getMembers, err := openstackutil.GetMembersbyPool(lbaas.lb, pool.ID)
if err != nil {
return fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
}
members := make(map[string]v2pools.Member)
for _, member := range getMembers {
members[member.Address] = member
}
// Add any new members for this port
for addr, node := range addrs {
if _, ok := members[addr]; ok && members[addr].ProtocolPort == int(port.NodePort) {
// Already exists, do not create member
continue
}
mc := metrics.NewMetricContext("loadbalancer_member", "create")
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
Name: cutString(fmt.Sprintf("member_%d_%s_%s", portIndex, node.Name, loadbalancer.Name)),
Address: addr,
ProtocolPort: int(port.NodePort),
SubnetID: lbaas.opts.SubnetID,
}).Extract()
if mc.ObserveRequest(err) != nil {
return err
}
if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
return err
}
}
// Remove any old members for this port
for _, member := range members {
if _, ok := addrs[member.Address]; ok && member.ProtocolPort == int(port.NodePort) {
// Still present, do not delete member
continue
}
mc := metrics.NewMetricContext("loadbalancer_member", "delete")
err = v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !cpoerrors.IsNotFound(err) {
return mc.ObserveRequest(err)
}
_ = mc.ObserveRequest(nil)
if err := openstackutil.WaitLoadbalancerActive(lbaas.lb, loadbalancer.ID); err != nil {
return err
}
}
}
if lbaas.opts.ManageSecurityGroups {
err := lbaas.updateSecurityGroup(clusterName, service, nodes)
if err != nil {
return fmt.Errorf("failed to update Security Group for loadbalancer service %s: %v", serviceName, err)
}
}
return nil
}