func()

in istio/pkg/controllers/servicecenter/controller.go [102:173]


func (c *Controller) getChangedServices(services []*discovery.MicroService) []event.ChangeEvent {
	// All non-watcher service center services mapped to their ids
	currServices := sync.Map{}
	// All new non-watcher service center services
	newServices := []*event.MicroserviceEntry{}
	// IDs of current service center watcher services
	currAppInstanceWatcherIds := sync.Map{}
	// Service events that must be pushed
	changes := []event.ChangeEvent{}
	for _, s := range services {
		name := s.ServiceName
		appId := s.AppId
		id := s.ServiceId
		if name != utils.WATCHER_SVC_NAME && name != utils.SERVICECENTER_ETCD_NAME && name != utils.SERVICECENTER_MONGO_NAME {
			entry := &event.MicroserviceEntry{MicroService: s}
			if cachedEntry, ok := c.serviceCache.Load(id); !ok {
				if _, ok := c.conn.AppInstanceWatcherCache.Load(appId); !ok {
					// Register new app instance watcher service
					watcherId, err := c.conn.RegisterAppInstanceWatcher(utils.WATCHER_SVC_NAME, appId, c.onInstanceUpdate)
					if err != nil {
						continue
					}
					// Record the id of the watcher service for this app
					currAppInstanceWatcherIds.Store(appId, watcherId)
				}
				// Collect newly created service
				changeEvent := event.ChangeEvent{Action: discovery.EVT_CREATE, Event: entry}
				changes = append(changes, changeEvent)
				newServices = append(newServices, entry)
				currServices.Store(id, entry)
			} else {
				cachedEntryEvent := cachedEntry.(*event.MicroserviceEntry)
				if !reflect.DeepEqual(s, cachedEntryEvent.MicroService) {
					// Collect updated service
					changeEvent := event.ChangeEvent{Action: discovery.EVT_UPDATE, Event: entry}
					changes = append(changes, changeEvent)
					currServices.Store(id, entry)
				} else {
					// No change, keep cache entry
					currServices.Store(id, cachedEntry)
				}
			}
		} else if name == utils.WATCHER_SVC_NAME {
			if k, ok := c.conn.AppInstanceWatcherCache.Load(appId); ok {
				if k.(string) == id {
					// Watcher still exists as expected, record its current id
					currAppInstanceWatcherIds.Store(appId, id)
				}
			}

		}
	}
	// Collect deleted services
	c.serviceCache.Range(func(key, value interface{}) bool {
		if _, ok := currServices.Load(key); !ok {
			changes = append(changes, event.ChangeEvent{
				Action: discovery.EVT_DELETE,
				Event:  value.(*event.MicroserviceEntry),
			})
		}
		return true
	})

	// Initial sync-up for newly created services; retrieve and start watching their instances
	c.initNewServices(newServices)
	// Update service ID cache with current services
	c.refreshServiceCache(currServices)
	// Check for app instance watcher changes
	c.checkAppInstanceWatchers(currAppInstanceWatcherIds)

	return changes
}