func()

in pilot/pkg/serviceregistry/serviceentry/servicediscovery.go [201:316]


func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.Event) {
	log.Debugf("Handle event %s for workload entry %s/%s", event, curr.Namespace, curr.Name)
	var oldWle *networking.WorkloadEntry
	if old.Spec != nil {
		oldWle = convertWorkloadEntry(old)
	}
	wle := convertWorkloadEntry(curr)
	curr.Spec = wle
	key := configKey{
		kind:      workloadEntryConfigType,
		name:      curr.Name,
		namespace: curr.Namespace,
	}

	// If an entry is unhealthy, we will mark this as a delete instead
	// This ensures we do not track unhealthy endpoints
	if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
		event = model.EventDelete
	}

	wi := s.convertWorkloadEntryToWorkloadInstance(curr, s.Cluster())
	if wi != nil && !wi.DNSServiceEntryOnly {
		// fire off the k8s handlers
		for _, h := range s.workloadHandlers {
			h(wi, event)
		}
	}

	// includes instances new updated or unchanged, in other word it is the current state.
	instancesUpdated := []*model.ServiceInstance{}
	instancesDeleted := []*model.ServiceInstance{}
	fullPush := false
	configsUpdated := map[model.ConfigKey]struct{}{}

	addConfigs := func(se *networking.ServiceEntry, services []*model.Service) {
		// If serviceentry's resolution is DNS, make a full push
		// TODO: maybe cds?
		if se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
			fullPush = true
			for key, value := range getUpdatedConfigs(services) {
				configsUpdated[key] = value
			}
		}
	}

	cfgs, _ := s.store.List(gvk.ServiceEntry, curr.Namespace)
	currSes := getWorkloadServiceEntries(cfgs, wle)
	var oldSes map[types.NamespacedName]*config.Config
	if oldWle != nil {
		if labels.Instance(oldWle.Labels).Equals(curr.Labels) {
			oldSes = currSes
		} else {
			oldSes = getWorkloadServiceEntries(cfgs, oldWle)
		}
	}
	unSelected := difference(oldSes, currSes)
	log.Debugf("workloadEntry %s/%s selected %v, unSelected %v serviceEntry", curr.Namespace, curr.Name, currSes, unSelected)
	s.mutex.Lock()
	for namespacedName, cfg := range currSes {
		services := s.services.getServices(namespacedName)
		se := cfg.Spec.(*networking.ServiceEntry)
		if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
			se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
			log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
			continue
		}
		instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
		instancesUpdated = append(instancesUpdated, instance...)
		addConfigs(se, services)
	}

	for _, namespacedName := range unSelected {
		services := s.services.getServices(namespacedName)
		cfg := oldSes[namespacedName]
		se := cfg.Spec.(*networking.ServiceEntry)
		if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
			se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
			log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
			continue
		}
		instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
		instancesDeleted = append(instancesDeleted, instance...)
		addConfigs(se, services)
	}

	s.serviceInstances.deleteInstances(key, instancesDeleted)
	if event == model.EventDelete {
		s.workloadInstances.Delete(wi)
		s.serviceInstances.deleteInstances(key, instancesUpdated)
	} else {
		s.workloadInstances.Insert(wi)
		s.serviceInstances.updateInstances(key, instancesUpdated)
	}
	s.mutex.Unlock()

	allInstances := append(instancesUpdated, instancesDeleted...)
	if !fullPush {
		// trigger full xds push to the related sidecar proxy
		if event == model.EventAdd {
			s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
		}
		s.edsUpdate(allInstances)
		return
	}

	// update eds cache only
	s.edsCacheUpdate(allInstances)

	pushReq := &model.PushRequest{
		Full:           true,
		ConfigsUpdated: configsUpdated,
		Reason:         []model.TriggerReason{model.EndpointUpdate},
	}
	// trigger a full push
	s.XdsUpdater.ConfigUpdate(pushReq)
}