in pkg/neg/syncers/transaction.go [179:276]
func (s *transactionSyncer) syncInternalImpl() error {
if s.needInit || s.isZoneChange() {
if err := s.ensureNetworkEndpointGroups(); err != nil {
return err
}
s.needInit = false
}
if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
klog.V(4).Infof("Skip syncing NEG %q for %s.", s.NegSyncerKey.NegName, s.NegSyncerKey.String())
return nil
}
klog.V(2).Infof("Sync NEG %q for %s, Endpoints Calculator mode %s", s.NegSyncerKey.NegName,
s.NegSyncerKey.String(), s.endpointsCalculator.Mode())
currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
if err != nil {
return err
}
s.logStats(currentMap, "current NEG endpoints")
// Merge the current state from cloud with the transaction table together
// The combined state represents the eventual result when all transactions completed
mergeTransactionIntoZoneEndpointMap(currentMap, s.transactions)
s.logStats(currentMap, "after in-progress operations have completed, NEG endpoints")
var targetMap map[string]negtypes.NetworkEndpointSet
var endpointPodMap negtypes.EndpointPodMap
if s.enableEndpointSlices {
slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
if err != nil {
return err
}
if len(slices) < 1 {
klog.Warningf("Endpoint slices for service %s/%s don't exist. Skipping NEG sync", s.Namespace, s.Name)
return nil
}
endpointSlices := make([]*discovery.EndpointSlice, len(slices))
for i, slice := range slices {
endpointSlices[i] = slice.(*discovery.EndpointSlice)
}
endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
targetMap, endpointPodMap, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
}
} else {
ep, exists, err := s.endpointLister.Get(
&apiv1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: s.Name,
Namespace: s.Namespace,
},
},
)
if err != nil {
return err
}
if !exists {
klog.Warningf("Endpoint %s/%s does not exist. Skipping NEG sync", s.Namespace, s.Name)
return nil
}
endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints))
targetMap, endpointPodMap, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
if err != nil {
return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
}
}
s.logStats(targetMap, "desired NEG endpoints")
// Calculate the endpoints to add and delete to transform the current state to desire state
addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap)
// Calculate Pods that are already in the NEG
_, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap)
// Filter out the endpoints with existing transaction
// This mostly happens when transaction entry require reconciliation but the transaction is still progress
// e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N.
// This ensures the endpoint that requires reconciliation to wait till the existing transaction to complete.
filterEndpointByTransaction(addEndpoints, s.transactions)
filterEndpointByTransaction(removeEndpoints, s.transactions)
// filter out the endpoints that are in transaction
filterEndpointByTransaction(committedEndpoints, s.transactions)
if s.needCommit() {
s.commitPods(committedEndpoints, endpointPodMap)
}
if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
klog.V(4).Infof("No endpoint change for %s/%s, skip syncing NEG. ", s.Namespace, s.Name)
return nil
}
s.logEndpoints(addEndpoints, "adding endpoint")
s.logEndpoints(removeEndpoints, "removing endpoint")
return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
}