in istio/pkg/controllers/servicecenter/controller.go [102:173]
func (c *Controller) getChangedServices(services []*discovery.MicroService) []event.ChangeEvent {
// All non-watcher service center services mapped to their ids
currServices := sync.Map{}
// All new non-watcher service center services
newServices := []*event.MicroserviceEntry{}
// IDs of current service center watcher services
currAppInstanceWatcherIds := sync.Map{}
// Service events that must be pushed
changes := []event.ChangeEvent{}
for _, s := range services {
name := s.ServiceName
appId := s.AppId
id := s.ServiceId
if name != utils.WATCHER_SVC_NAME && name != utils.SERVICECENTER_ETCD_NAME && name != utils.SERVICECENTER_MONGO_NAME {
entry := &event.MicroserviceEntry{MicroService: s}
if cachedEntry, ok := c.serviceCache.Load(id); !ok {
if _, ok := c.conn.AppInstanceWatcherCache.Load(appId); !ok {
// Register new app instance watcher service
watcherId, err := c.conn.RegisterAppInstanceWatcher(utils.WATCHER_SVC_NAME, appId, c.onInstanceUpdate)
if err != nil {
continue
}
// Record the id of the watcher service for this app
currAppInstanceWatcherIds.Store(appId, watcherId)
}
// Collect newly created service
changeEvent := event.ChangeEvent{Action: discovery.EVT_CREATE, Event: entry}
changes = append(changes, changeEvent)
newServices = append(newServices, entry)
currServices.Store(id, entry)
} else {
cachedEntryEvent := cachedEntry.(*event.MicroserviceEntry)
if !reflect.DeepEqual(s, cachedEntryEvent.MicroService) {
// Collect updated service
changeEvent := event.ChangeEvent{Action: discovery.EVT_UPDATE, Event: entry}
changes = append(changes, changeEvent)
currServices.Store(id, entry)
} else {
// No change, keep cache entry
currServices.Store(id, cachedEntry)
}
}
} else if name == utils.WATCHER_SVC_NAME {
if k, ok := c.conn.AppInstanceWatcherCache.Load(appId); ok {
if k.(string) == id {
// Watcher still exists as expected, record its current id
currAppInstanceWatcherIds.Store(appId, id)
}
}
}
}
// Collect deleted services
c.serviceCache.Range(func(key, value interface{}) bool {
if _, ok := currServices.Load(key); !ok {
changes = append(changes, event.ChangeEvent{
Action: discovery.EVT_DELETE,
Event: value.(*event.MicroserviceEntry),
})
}
return true
})
// Initial sync-up for newly created services; retrieve and start watching their instances
c.initNewServices(newServices)
// Update service ID cache with current services
c.refreshServiceCache(currServices)
// Check for app instance watcher changes
c.checkAppInstanceWatchers(currAppInstanceWatcherIds)
return changes
}