func()

in pkg/neg/manager.go [148:247]


func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts negtypes.PortInfoMap) (int, int, error) {
	manager.mu.Lock()
	defer manager.mu.Unlock()
	start := time.Now()
	key := getServiceKey(namespace, name)
	currentPorts, ok := manager.svcPortMap[key]
	if !ok {
		currentPorts = make(negtypes.PortInfoMap)
	}

	removes := currentPorts.Difference(newPorts)
	adds := newPorts.Difference(currentPorts)
	samePorts := newPorts.Difference(adds)
	// There may be duplicate ports in adds and removes due to difference in readinessGate flag
	// Service/Ingress config changes can cause readinessGate to be turn on or off for the same service port.
	// By removing the duplicate ports in removes and adds, this prevents disruption of NEG syncer due to the config changes
	// Hence, Existing NEG syncer for the service port will always work
	manager.removeCommonPorts(adds, removes)
	manager.svcPortMap[key] = newPorts
	klog.V(3).Infof("EnsureSyncer %v/%v: syncing %v ports, removing %v ports, adding %v ports", namespace, name, newPorts, removes, adds)

	errList := []error{}
	successfulSyncers := 0
	errorSyncers := 0
	for svcPort, portInfo := range removes {
		syncer, ok := manager.syncerMap[manager.getSyncerKey(namespace, name, svcPort, portInfo)]
		if ok {
			syncer.Stop()
		}

		err := manager.ensureDeleteSvcNegCR(namespace, portInfo.NegName)
		if err != nil {
			errList = append(errList, err)
		}
	}

	for _, portInfo := range samePorts {
		// To reduce the possibility of NEGs being leaked, ensure a SvcNeg CR exists for every
		// desired port.
		if err := manager.ensureSvcNegCR(key, portInfo); err != nil {
			errList = append(errList, fmt.Errorf("failed to ensure svc neg cr %s/%s/%d for existing port: %w", namespace, portInfo.NegName, portInfo.PortTuple.Port, err))
			errorSyncers += 1
		} else {
			successfulSyncers += 1
		}
	}

	// Ensure a syncer is running for each port that is being added.
	for svcPort, portInfo := range adds {
		syncerKey := manager.getSyncerKey(namespace, name, svcPort, portInfo)
		syncer, ok := manager.syncerMap[syncerKey]
		if !ok {

			// To ensure that a NEG CR always exists during the lifecyle of a NEG, do not create a
			// syncer for the NEG until the NEG CR is successfully created. This will reduce the
			// possibility of invalid states and reduces complexity of garbage collection
			if err := manager.ensureSvcNegCR(key, portInfo); err != nil {
				errList = append(errList, fmt.Errorf("failed to ensure svc neg cr %s/%s/%d for new port: %w ", namespace, portInfo.NegName, svcPort.ServicePort, err))
				errorSyncers += 1
				continue
			}

			// determine the implementation that calculates NEG endpoints on each sync.
			epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter,
				syncerKey, portInfo.EpCalculatorMode)
			syncer = negsyncer.NewTransactionSyncer(
				syncerKey,
				manager.recorder,
				manager.cloud,
				manager.zoneGetter,
				manager.podLister,
				manager.serviceLister,
				manager.endpointLister,
				manager.endpointSliceLister,
				manager.nodeLister,
				manager.svcNegLister,
				manager.reflector,
				epc,
				string(manager.kubeSystemUID),
				manager.svcNegClient,
				!manager.namer.IsNEG(portInfo.NegName),
				manager.enableEndpointSlices,
			)
			manager.syncerMap[syncerKey] = syncer
		}

		if syncer.IsStopped() {
			if err := syncer.Start(); err != nil {
				errList = append(errList, err)
				errorSyncers += 1
				continue
			}
		}
		successfulSyncers += 1
	}
	err := utilerrors.NewAggregate(errList)
	metrics.PublishNegManagerProcessMetrics(metrics.SyncProcess, err, start)

	return successfulSyncers, errorSyncers, err
}