in openstack/openstack_loadbalancer.go [1183:1323]
func (lbaas *LbaasV2) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
lbaas.opts.SubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
if len(lbaas.opts.SubnetID) == 0 && len(nodes) > 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
if err != nil {
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
return fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
"and failed to find subnet-id from OpenStack: %v", service.Namespace, service.Name, err)
}
lbaas.opts.SubnetID = subnetID
}
ports := service.Spec.Ports
if len(ports) == 0 {
return fmt.Errorf("no ports provided to openstack load balancer")
}
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
if err != nil {
return err
}
if loadbalancer == nil {
return fmt.Errorf("loadbalancer %s does not exist", loadBalancerName)
}
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol listeners.Protocol
Port int
}
var listenerIDs []string
lbListeners := make(map[portKey]listeners.Listener)
allListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("error getting listeners for LB %s: %v", loadBalancerName, err)
}
for _, l := range allListeners {
key := portKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
lbListeners[key] = l
listenerIDs = append(listenerIDs, l.ID)
}
// Get all pools for this loadbalancer, by listener ID.
lbPools := make(map[string]v2pools.Pool)
for _, listenerID := range listenerIDs {
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listenerID)
if err != nil {
return fmt.Errorf("error getting pool for listener %s: %v", listenerID, err)
}
lbPools[listenerID] = *pool
}
// Compose Set of member (addresses) that _should_ exist
addrs := make(map[string]*v1.Node)
for _, node := range nodes {
addr, err := nodeAddressForLB(node)
if err != nil {
return err
}
addrs[addr] = node
}
// Check for adding/removing members associated with each port
for portIndex, port := range ports {
// Get listener associated with this port
listener, ok := lbListeners[portKey{
Protocol: toListenersProtocol(port.Protocol),
Port: int(port.Port),
}]
if !ok {
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
}
// Get pool associated with this listener
pool, ok := lbPools[listener.ID]
if !ok {
return fmt.Errorf("loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
}
// Find existing pool members (by address) for this port
getMembers, err := getMembersByPoolID(lbaas.lb, pool.ID)
if err != nil {
return fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
}
members := make(map[string]v2pools.Member)
for _, member := range getMembers {
members[member.Address] = member
}
// Add any new members for this port
for addr, node := range addrs {
if _, ok := members[addr]; ok && members[addr].ProtocolPort == int(port.NodePort) {
// Already exists, do not create member
continue
}
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
Name: fmt.Sprintf("member_%s_%d_%s", loadbalancer.Name, portIndex, node.Name),
Address: addr,
ProtocolPort: int(port.NodePort),
SubnetID: lbaas.opts.SubnetID,
}).Extract()
if err != nil {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
// Remove any old members for this port
for _, member := range members {
if _, ok := addrs[member.Address]; ok && member.ProtocolPort == int(port.NodePort) {
// Still present, do not delete member
continue
}
err = v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
if err != nil && !isNotFound(err) {
return err
}
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
if err != nil {
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
}
}
}
if lbaas.opts.ManageSecurityGroups {
err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer)
if err != nil {
return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
}
}
return nil
}