func()

in pkg/neg/syncers/transaction.go [179:276]


func (s *transactionSyncer) syncInternalImpl() error {
	if s.needInit || s.isZoneChange() {
		if err := s.ensureNetworkEndpointGroups(); err != nil {
			return err
		}
		s.needInit = false
	}

	if s.syncer.IsStopped() || s.syncer.IsShuttingDown() {
		klog.V(4).Infof("Skip syncing NEG %q for %s.", s.NegSyncerKey.NegName, s.NegSyncerKey.String())
		return nil
	}
	klog.V(2).Infof("Sync NEG %q for %s, Endpoints Calculator mode %s", s.NegSyncerKey.NegName,
		s.NegSyncerKey.String(), s.endpointsCalculator.Mode())

	currentMap, err := retrieveExistingZoneNetworkEndpointMap(s.NegSyncerKey.NegName, s.zoneGetter, s.cloud, s.NegSyncerKey.GetAPIVersion(), s.endpointsCalculator.Mode())
	if err != nil {
		return err
	}
	s.logStats(currentMap, "current NEG endpoints")

	// Merge the current state from cloud with the transaction table together
	// The combined state represents the eventual result when all transactions completed
	mergeTransactionIntoZoneEndpointMap(currentMap, s.transactions)
	s.logStats(currentMap, "after in-progress operations have completed, NEG endpoints")

	var targetMap map[string]negtypes.NetworkEndpointSet
	var endpointPodMap negtypes.EndpointPodMap

	if s.enableEndpointSlices {
		slices, err := s.endpointSliceLister.ByIndex(endpointslices.EndpointSlicesByServiceIndex, endpointslices.FormatEndpointSlicesServiceKey(s.Namespace, s.Name))
		if err != nil {
			return err
		}
		if len(slices) < 1 {
			klog.Warningf("Endpoint slices for service %s/%s don't exist. Skipping NEG sync", s.Namespace, s.Name)
			return nil
		}
		endpointSlices := make([]*discovery.EndpointSlice, len(slices))
		for i, slice := range slices {
			endpointSlices[i] = slice.(*discovery.EndpointSlice)
		}
		endpointsData := negtypes.EndpointsDataFromEndpointSlices(endpointSlices)
		targetMap, endpointPodMap, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
		if err != nil {
			return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
		}
	} else {
		ep, exists, err := s.endpointLister.Get(
			&apiv1.Endpoints{
				ObjectMeta: metav1.ObjectMeta{
					Name:      s.Name,
					Namespace: s.Namespace,
				},
			},
		)
		if err != nil {
			return err
		}
		if !exists {
			klog.Warningf("Endpoint %s/%s does not exist. Skipping NEG sync", s.Namespace, s.Name)
			return nil
		}
		endpointsData := negtypes.EndpointsDataFromEndpoints(ep.(*apiv1.Endpoints))
		targetMap, endpointPodMap, err = s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap)
		if err != nil {
			return fmt.Errorf("endpoints calculation error in mode %q, err: %w", s.endpointsCalculator.Mode(), err)
		}
	}

	s.logStats(targetMap, "desired NEG endpoints")

	// Calculate the endpoints to add and delete to transform the current state to desire state
	addEndpoints, removeEndpoints := calculateNetworkEndpointDifference(targetMap, currentMap)
	// Calculate Pods that are already in the NEG
	_, committedEndpoints := calculateNetworkEndpointDifference(addEndpoints, targetMap)
	// Filter out the endpoints with existing transaction
	// This mostly happens when transaction entry require reconciliation but the transaction is still progress
	// e.g. endpoint A is in the process of adding to NEG N, and the new desire state is not to have A in N.
	// This ensures the endpoint that requires reconciliation to wait till the existing transaction to complete.
	filterEndpointByTransaction(addEndpoints, s.transactions)
	filterEndpointByTransaction(removeEndpoints, s.transactions)
	// filter out the endpoints that are in transaction
	filterEndpointByTransaction(committedEndpoints, s.transactions)

	if s.needCommit() {
		s.commitPods(committedEndpoints, endpointPodMap)
	}

	if len(addEndpoints) == 0 && len(removeEndpoints) == 0 {
		klog.V(4).Infof("No endpoint change for %s/%s, skip syncing NEG. ", s.Namespace, s.Name)
		return nil
	}
	s.logEndpoints(addEndpoints, "adding endpoint")
	s.logEndpoints(removeEndpoints, "removing endpoint")

	return s.syncNetworkEndpoints(addEndpoints, removeEndpoints)
}