func()

in gce/gce_loadbalancer_internal.go [55:254]


func (g *Cloud) ensureInternalLoadBalancer(clusterName, clusterID string, svc *v1.Service, existingFwdRule *compute.ForwardingRule, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
	if existingFwdRule == nil && !hasFinalizer(svc, ILBFinalizerV1) {
		// Neither the forwarding rule nor the V1 finalizer exists. This is most likely a new service.
		if g.AlphaFeatureGate.Enabled(AlphaFeatureILBSubsets) {
			// When ILBSubsets is enabled, new ILB services will not be processed here.
			// Services that have existing GCE resources created by this controller or the v1 finalizer
			// will continue to update.
			klog.V(2).Infof("Skipped ensureInternalLoadBalancer for service %s/%s, since %s feature is enabled.", svc.Namespace, svc.Name, AlphaFeatureILBSubsets)
			return nil, cloudprovider.ImplementedElsewhere
		}
		if hasFinalizer(svc, ILBFinalizerV2) {
			// No V1 resources present - Another controller is handling the resources for this service.
			klog.V(2).Infof("Skipped ensureInternalLoadBalancer for service %s/%s, as service contains %q finalizer.", svc.Namespace, svc.Name, ILBFinalizerV2)
			return nil, cloudprovider.ImplementedElsewhere
		}
	}

	nm := types.NamespacedName{Name: svc.Name, Namespace: svc.Namespace}

	var serviceState L4ILBServiceState
	// Mark the service InSuccess state as false to begin with.
	// This will be updated to true if the VIP is configured successfully.
	serviceState.InSuccess = false
	defer func() {
		g.metricsCollector.SetL4ILBService(nm.String(), serviceState)
	}()

	loadBalancerName := g.GetLoadBalancerName(context.TODO(), clusterName, svc)
	klog.V(2).Infof("ensureInternalLoadBalancer(%v): Attaching %q finalizer", loadBalancerName, ILBFinalizerV1)
	if err := addFinalizer(svc, g.client.CoreV1(), ILBFinalizerV1); err != nil {
		klog.Errorf("Failed to attach finalizer '%s' on service %s/%s - %v", ILBFinalizerV1, svc.Namespace, svc.Name, err)
		return nil, err
	}

	ports, _, protocol := getPortsAndProtocol(svc.Spec.Ports)
	if protocol != v1.ProtocolTCP && protocol != v1.ProtocolUDP {
		return nil, fmt.Errorf("Invalid protocol %s, only TCP and UDP are supported", string(protocol))
	}
	scheme := cloud.SchemeInternal
	options := getILBOptions(svc)
	if g.IsLegacyNetwork() {
		g.eventRecorder.Event(svc, v1.EventTypeWarning, "ILBOptionsIgnored", "Internal LoadBalancer options are not supported with Legacy Networks.")
		options = ILBOptions{}
	}

	sharedBackend := shareBackendService(svc)
	backendServiceName := makeBackendServiceName(loadBalancerName, clusterID, sharedBackend, scheme, protocol, svc.Spec.SessionAffinity)
	backendServiceLink := g.getBackendServiceLink(backendServiceName)

	// Ensure instance groups exist and nodes are assigned to groups
	igName := makeInstanceGroupName(clusterID)
	igLinks, err := g.ensureInternalInstanceGroups(igName, nodes)
	if err != nil {
		return nil, err
	}

	// Get existing backend service (if exists)
	var existingBackendService *compute.BackendService
	if existingFwdRule != nil && existingFwdRule.BackendService != "" {
		existingBSName := getNameFromLink(existingFwdRule.BackendService)
		if existingBackendService, err = g.GetRegionBackendService(existingBSName, g.region); err != nil && !isNotFound(err) {
			return nil, err
		}
	}

	// Lock the sharedResourceLock to prevent any deletions of shared resources while assembling shared resources here
	g.sharedResourceLock.Lock()
	defer g.sharedResourceLock.Unlock()

	// Ensure health check exists before creating the backend service. The health check is shared
	// if externalTrafficPolicy=Cluster.
	sharedHealthCheck := !servicehelpers.RequestsOnlyLocalTraffic(svc)
	hcName := makeHealthCheckName(loadBalancerName, clusterID, sharedHealthCheck)
	hcPath, hcPort := GetNodesHealthCheckPath(), GetNodesHealthCheckPort()
	if !sharedHealthCheck {
		// Service requires a special health check, retrieve the OnlyLocal port & path
		hcPath, hcPort = servicehelpers.GetServiceHealthCheckPathPort(svc)
	}
	hc, err := g.ensureInternalHealthCheck(hcName, nm, sharedHealthCheck, hcPath, hcPort)
	if err != nil {
		return nil, err
	}

	subnetworkURL := g.SubnetworkURL()
	// Any subnet specified using the subnet annotation will be picked up and reflected in the forwarding rule.
	// Removing the annotation will set the forwarding rule to use the default subnet and result in a VIP change.
	// In order to support existing ILBs that were setup using the wrong subnet - https://github.com/kubernetes/kubernetes/pull/57861,
	// users will need to specify that subnet with the annotation.
	if options.SubnetName != "" {
		subnetworkURL = gceSubnetworkURL("", g.networkProjectID, g.region, options.SubnetName)
	}
	// Determine IP which will be used for this LB. If no forwarding rule has been established
	// or specified in the Service spec, then requestedIP = "".
	ipToUse := ilbIPToUse(svc, existingFwdRule, subnetworkURL)

	klog.V(2).Infof("ensureInternalLoadBalancer(%v): Using subnet %s for LoadBalancer IP %s", loadBalancerName, options.SubnetName, ipToUse)

	var addrMgr *addressManager
	// If the network is not a legacy network, use the address manager
	if !g.IsLegacyNetwork() {
		addrMgr = newAddressManager(g, nm.String(), g.Region(), subnetworkURL, loadBalancerName, ipToUse, cloud.SchemeInternal)
		ipToUse, err = addrMgr.HoldAddress()
		if err != nil {
			return nil, err
		}
		klog.V(2).Infof("ensureInternalLoadBalancer(%v): reserved IP %q for the forwarding rule", loadBalancerName, ipToUse)
		defer func() {
			// Release the address if all resources were created successfully, or if we error out.
			if err := addrMgr.ReleaseAddress(); err != nil {
				klog.Errorf("ensureInternalLoadBalancer: failed to release address reservation, possibly causing an orphan: %v", err)
			}
		}()
	}

	// Ensure firewall rules if necessary
	if err = g.ensureInternalFirewalls(loadBalancerName, ipToUse, clusterID, nm, svc, strconv.Itoa(int(hcPort)), sharedHealthCheck, nodes); err != nil {
		return nil, err
	}

	fwdRuleDescription := &forwardingRuleDescription{ServiceName: nm.String()}
	fwdRuleDescriptionString, err := fwdRuleDescription.marshal()
	if err != nil {
		return nil, err
	}
	newFwdRule := &compute.ForwardingRule{
		Name:                loadBalancerName,
		Description:         fwdRuleDescriptionString,
		IPAddress:           ipToUse,
		BackendService:      backendServiceLink,
		Ports:               ports,
		IPProtocol:          string(protocol),
		LoadBalancingScheme: string(scheme),
		// Given that CreateGCECloud will attempt to determine the subnet based off the network,
		// the subnetwork should rarely be unknown.
		Subnetwork: subnetworkURL,
		Network:    g.networkURL,
	}
	if options.AllowGlobalAccess {
		newFwdRule.AllowGlobalAccess = options.AllowGlobalAccess
	}
	if len(ports) > maxL4ILBPorts {
		newFwdRule.Ports = nil
		newFwdRule.AllPorts = true
	}

	fwdRuleDeleted := false
	if existingFwdRule != nil && !forwardingRulesEqual(existingFwdRule, newFwdRule) {
		// Delete existing forwarding rule before making changes to the backend service. For example - changing protocol
		// of backend service without first deleting forwarding rule will throw an error since the linked forwarding
		// rule would show the old protocol.
		if klog.V(2).Enabled() {
			frDiff := cmp.Diff(existingFwdRule, newFwdRule)
			klog.V(2).Infof("ensureInternalLoadBalancer(%v): forwarding rule changed - Existing - %+v\n, New - %+v\n, Diff(-existing, +new) - %s\n. Deleting existing forwarding rule.", loadBalancerName, existingFwdRule, newFwdRule, frDiff)
		}
		if err = ignoreNotFound(g.DeleteRegionForwardingRule(loadBalancerName, g.region)); err != nil {
			return nil, err
		}
		fwdRuleDeleted = true
	}

	bsDescription := makeBackendServiceDescription(nm, sharedBackend)
	err = g.ensureInternalBackendService(backendServiceName, bsDescription, svc.Spec.SessionAffinity, scheme, protocol, igLinks, hc.SelfLink)
	if err != nil {
		return nil, err
	}

	if fwdRuleDeleted || existingFwdRule == nil {
		// existing rule has been deleted, pass in nil
		if err := g.ensureInternalForwardingRule(nil, newFwdRule); err != nil {
			return nil, err
		}
	}

	// Delete the previous internal load balancer resources if necessary
	if existingBackendService != nil {
		g.clearPreviousInternalResources(svc, loadBalancerName, existingBackendService, backendServiceName, hcName)
	}

	// Get the most recent forwarding rule for the address.
	updatedFwdRule, err := g.GetRegionForwardingRule(loadBalancerName, g.region)
	if err != nil {
		return nil, err
	}

	serviceState.InSuccess = true
	if options.AllowGlobalAccess {
		serviceState.EnabledGlobalAccess = true
	}
	// SubnetName is overridden to nil value if Alpha feature gate for custom subnet
	// is not enabled. So, a non empty subnet name at this point implies that the
	// feature is in use.
	if options.SubnetName != "" {
		serviceState.EnabledCustomSubnet = true
	}
	klog.V(6).Infof("Internal Loadbalancer for Service %s ensured, updating its state %v in metrics cache", nm, serviceState)

	status := &v1.LoadBalancerStatus{}
	status.Ingress = []v1.LoadBalancerIngress{{IP: updatedFwdRule.IPAddress}}
	return status, nil
}