func()

in remoting/xds/client.go [269:355]


func (w *WrappedClientImpl) registerHostLevelSubscription(hostAddr, interfaceName, svcUniqueName string, lst registry.NotifyListener) {
	// 1. listen all cluster related endpoint
	w.hostAddrListenerMapLock.Lock()
	if _, ok := w.hostAddrListenerMap[hostAddr]; ok {
		// if subscription exist, register listener directly
		w.hostAddrListenerMap[hostAddr][svcUniqueName] = lst
		w.hostAddrListenerMapLock.Unlock()
		return
	}
	// host HostAddr key must not exist in map, create one
	w.hostAddrListenerMap[hostAddr] = make(map[string]registry.NotifyListener)

	w.hostAddrClusterCtxMapLock.Lock()
	w.hostAddrClusterCtxMap[hostAddr] = make(map[string]ewatcher.EWatcher)
	w.hostAddrClusterCtxMapLock.Unlock()

	w.hostAddrListenerMap[hostAddr][svcUniqueName] = lst
	w.hostAddrListenerMapLock.Unlock()

	// watch cluster change, and start listening newcoming cluster
	w.cdsUpdateEventHandlersLock.Lock()
	w.cdsUpdateEventHandlers = append(w.cdsUpdateEventHandlers, func() {
		// todo @(laurnece) now this event would be called if any cluster is change, but not only this hostAddr's
		updatedAllVersionedClusterName := w.getAllVersionClusterName(hostAddr)
		// do patch
		w.hostAddrClusterCtxMapLock.RLock()
		listeningClustersCancelMap := w.hostAddrClusterCtxMap[hostAddr]
		w.hostAddrClusterCtxMapLock.RUnlock()

		oldlisteningClusterMap := make(map[string]bool)
		for cluster := range listeningClustersCancelMap {
			oldlisteningClusterMap[cluster] = false
		}
		for _, updatedClusterName := range updatedAllVersionedClusterName {
			if _, ok := listeningClustersCancelMap[updatedClusterName]; ok {
				// already listening
				oldlisteningClusterMap[updatedClusterName] = true
				continue
			}
			// new cluster
			watcher := ewatcher.NewEndpointWatcherCtxImpl(
				updatedClusterName, hostAddr, interfaceName, &w.hostAddrListenerMapLock, w.hostAddrListenerMap)
			cancel := w.xdsClient.WatchEndpoints(updatedClusterName, watcher.Handle)
			watcher.SetCancelFunction(cancel)
			w.hostAddrClusterCtxMapLock.Lock()
			w.hostAddrClusterCtxMap[hostAddr][updatedClusterName] = watcher
			w.hostAddrClusterCtxMapLock.Unlock()
		}

		// cancel not exist cluster
		for cluster, v := range oldlisteningClusterMap {
			if !v {
				// this cluster not exist in update cluster list
				w.hostAddrClusterCtxMapLock.Lock()
				if watchCtx, ok := w.hostAddrClusterCtxMap[hostAddr][cluster]; ok {
					delete(w.hostAddrClusterCtxMap[hostAddr], cluster)
					watchCtx.Destroy()
				}
				w.hostAddrClusterCtxMapLock.Unlock()
			}
		}
	})
	w.cdsUpdateEventHandlersLock.Unlock()

	// update cluster of now
	allVersionedClusterName := w.getAllVersionClusterName(hostAddr)
	for _, c := range allVersionedClusterName {
		watcher := ewatcher.NewEndpointWatcherCtxImpl(
			c, hostAddr, interfaceName, &w.hostAddrListenerMapLock, w.hostAddrListenerMap)
		watcher.SetCancelFunction(w.xdsClient.WatchEndpoints(c, watcher.Handle))

		w.hostAddrClusterCtxMapLock.Lock()
		w.hostAddrClusterCtxMap[hostAddr][c] = watcher
		w.hostAddrClusterCtxMapLock.Unlock()
	}

	// 2. cache route config
	// todo @(laurnece) cancel watching of this addr's rds
	_ = w.xdsClient.WatchRouteConfig(hostAddr, func(update resource.RouteConfigUpdate, err error) {
		if update.VirtualHosts == nil {
			return
		}
		w.rdsMapLock.Lock()
		defer w.rdsMapLock.Unlock()
		w.rdsMap[hostAddr] = update
	})
}