in pkg/neg/manager.go [148:247]
func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts negtypes.PortInfoMap) (int, int, error) {
manager.mu.Lock()
defer manager.mu.Unlock()
start := time.Now()
key := getServiceKey(namespace, name)
currentPorts, ok := manager.svcPortMap[key]
if !ok {
currentPorts = make(negtypes.PortInfoMap)
}
removes := currentPorts.Difference(newPorts)
adds := newPorts.Difference(currentPorts)
samePorts := newPorts.Difference(adds)
// There may be duplicate ports in adds and removes due to difference in readinessGate flag
// Service/Ingress config changes can cause readinessGate to be turn on or off for the same service port.
// By removing the duplicate ports in removes and adds, this prevents disruption of NEG syncer due to the config changes
// Hence, Existing NEG syncer for the service port will always work
manager.removeCommonPorts(adds, removes)
manager.svcPortMap[key] = newPorts
klog.V(3).Infof("EnsureSyncer %v/%v: syncing %v ports, removing %v ports, adding %v ports", namespace, name, newPorts, removes, adds)
errList := []error{}
successfulSyncers := 0
errorSyncers := 0
for svcPort, portInfo := range removes {
syncer, ok := manager.syncerMap[manager.getSyncerKey(namespace, name, svcPort, portInfo)]
if ok {
syncer.Stop()
}
err := manager.ensureDeleteSvcNegCR(namespace, portInfo.NegName)
if err != nil {
errList = append(errList, err)
}
}
for _, portInfo := range samePorts {
// To reduce the possibility of NEGs being leaked, ensure a SvcNeg CR exists for every
// desired port.
if err := manager.ensureSvcNegCR(key, portInfo); err != nil {
errList = append(errList, fmt.Errorf("failed to ensure svc neg cr %s/%s/%d for existing port: %w", namespace, portInfo.NegName, portInfo.PortTuple.Port, err))
errorSyncers += 1
} else {
successfulSyncers += 1
}
}
// Ensure a syncer is running for each port that is being added.
for svcPort, portInfo := range adds {
syncerKey := manager.getSyncerKey(namespace, name, svcPort, portInfo)
syncer, ok := manager.syncerMap[syncerKey]
if !ok {
// To ensure that a NEG CR always exists during the lifecyle of a NEG, do not create a
// syncer for the NEG until the NEG CR is successfully created. This will reduce the
// possibility of invalid states and reduces complexity of garbage collection
if err := manager.ensureSvcNegCR(key, portInfo); err != nil {
errList = append(errList, fmt.Errorf("failed to ensure svc neg cr %s/%s/%d for new port: %w ", namespace, portInfo.NegName, svcPort.ServicePort, err))
errorSyncers += 1
continue
}
// determine the implementation that calculates NEG endpoints on each sync.
epc := negsyncer.GetEndpointsCalculator(manager.nodeLister, manager.podLister, manager.zoneGetter,
syncerKey, portInfo.EpCalculatorMode)
syncer = negsyncer.NewTransactionSyncer(
syncerKey,
manager.recorder,
manager.cloud,
manager.zoneGetter,
manager.podLister,
manager.serviceLister,
manager.endpointLister,
manager.endpointSliceLister,
manager.nodeLister,
manager.svcNegLister,
manager.reflector,
epc,
string(manager.kubeSystemUID),
manager.svcNegClient,
!manager.namer.IsNEG(portInfo.NegName),
manager.enableEndpointSlices,
)
manager.syncerMap[syncerKey] = syncer
}
if syncer.IsStopped() {
if err := syncer.Start(); err != nil {
errList = append(errList, err)
errorSyncers += 1
continue
}
}
successfulSyncers += 1
}
err := utilerrors.NewAggregate(errList)
metrics.PublishNegManagerProcessMetrics(metrics.SyncProcess, err, start)
return successfulSyncers, errorSyncers, err
}