in pilot/pkg/serviceregistry/serviceentry/servicediscovery.go [201:316]
func (s *Controller) workloadEntryHandler(old, curr config.Config, event model.Event) {
log.Debugf("Handle event %s for workload entry %s/%s", event, curr.Namespace, curr.Name)
var oldWle *networking.WorkloadEntry
if old.Spec != nil {
oldWle = convertWorkloadEntry(old)
}
wle := convertWorkloadEntry(curr)
curr.Spec = wle
key := configKey{
kind: workloadEntryConfigType,
name: curr.Name,
namespace: curr.Namespace,
}
// If an entry is unhealthy, we will mark this as a delete instead
// This ensures we do not track unhealthy endpoints
if features.WorkloadEntryHealthChecks && !isHealthy(curr) {
event = model.EventDelete
}
wi := s.convertWorkloadEntryToWorkloadInstance(curr, s.Cluster())
if wi != nil && !wi.DNSServiceEntryOnly {
// fire off the k8s handlers
for _, h := range s.workloadHandlers {
h(wi, event)
}
}
// includes instances new updated or unchanged, in other word it is the current state.
instancesUpdated := []*model.ServiceInstance{}
instancesDeleted := []*model.ServiceInstance{}
fullPush := false
configsUpdated := map[model.ConfigKey]struct{}{}
addConfigs := func(se *networking.ServiceEntry, services []*model.Service) {
// If serviceentry's resolution is DNS, make a full push
// TODO: maybe cds?
if se.Resolution == networking.ServiceEntry_DNS || se.Resolution == networking.ServiceEntry_DNS_ROUND_ROBIN {
fullPush = true
for key, value := range getUpdatedConfigs(services) {
configsUpdated[key] = value
}
}
}
cfgs, _ := s.store.List(gvk.ServiceEntry, curr.Namespace)
currSes := getWorkloadServiceEntries(cfgs, wle)
var oldSes map[types.NamespacedName]*config.Config
if oldWle != nil {
if labels.Instance(oldWle.Labels).Equals(curr.Labels) {
oldSes = currSes
} else {
oldSes = getWorkloadServiceEntries(cfgs, oldWle)
}
}
unSelected := difference(oldSes, currSes)
log.Debugf("workloadEntry %s/%s selected %v, unSelected %v serviceEntry", curr.Namespace, curr.Name, currSes, unSelected)
s.mutex.Lock()
for namespacedName, cfg := range currSes {
services := s.services.getServices(namespacedName)
se := cfg.Spec.(*networking.ServiceEntry)
if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
continue
}
instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
instancesUpdated = append(instancesUpdated, instance...)
addConfigs(se, services)
}
for _, namespacedName := range unSelected {
services := s.services.getServices(namespacedName)
cfg := oldSes[namespacedName]
se := cfg.Spec.(*networking.ServiceEntry)
if wi.DNSServiceEntryOnly && se.Resolution != networking.ServiceEntry_DNS &&
se.Resolution != networking.ServiceEntry_DNS_ROUND_ROBIN {
log.Debugf("skip selecting workload instance %v/%v for DNS service entry %v", wi.Namespace, wi.Name, se.Hosts)
continue
}
instance := s.convertWorkloadEntryToServiceInstances(wle, services, se, &key, s.Cluster())
instancesDeleted = append(instancesDeleted, instance...)
addConfigs(se, services)
}
s.serviceInstances.deleteInstances(key, instancesDeleted)
if event == model.EventDelete {
s.workloadInstances.Delete(wi)
s.serviceInstances.deleteInstances(key, instancesUpdated)
} else {
s.workloadInstances.Insert(wi)
s.serviceInstances.updateInstances(key, instancesUpdated)
}
s.mutex.Unlock()
allInstances := append(instancesUpdated, instancesDeleted...)
if !fullPush {
// trigger full xds push to the related sidecar proxy
if event == model.EventAdd {
s.XdsUpdater.ProxyUpdate(s.Cluster(), wle.Address)
}
s.edsUpdate(allInstances)
return
}
// update eds cache only
s.edsCacheUpdate(allInstances)
pushReq := &model.PushRequest{
Full: true,
ConfigsUpdated: configsUpdated,
Reason: []model.TriggerReason{model.EndpointUpdate},
}
// trigger a full push
s.XdsUpdater.ConfigUpdate(pushReq)
}