in remoting/xds/client.go [269:355]
func (w *WrappedClientImpl) registerHostLevelSubscription(hostAddr, interfaceName, svcUniqueName string, lst registry.NotifyListener) {
// 1. listen all cluster related endpoint
w.hostAddrListenerMapLock.Lock()
if _, ok := w.hostAddrListenerMap[hostAddr]; ok {
// if subscription exist, register listener directly
w.hostAddrListenerMap[hostAddr][svcUniqueName] = lst
w.hostAddrListenerMapLock.Unlock()
return
}
// host HostAddr key must not exist in map, create one
w.hostAddrListenerMap[hostAddr] = make(map[string]registry.NotifyListener)
w.hostAddrClusterCtxMapLock.Lock()
w.hostAddrClusterCtxMap[hostAddr] = make(map[string]ewatcher.EWatcher)
w.hostAddrClusterCtxMapLock.Unlock()
w.hostAddrListenerMap[hostAddr][svcUniqueName] = lst
w.hostAddrListenerMapLock.Unlock()
// watch cluster change, and start listening newcoming cluster
w.cdsUpdateEventHandlersLock.Lock()
w.cdsUpdateEventHandlers = append(w.cdsUpdateEventHandlers, func() {
// todo @(laurnece) now this event would be called if any cluster is change, but not only this hostAddr's
updatedAllVersionedClusterName := w.getAllVersionClusterName(hostAddr)
// do patch
w.hostAddrClusterCtxMapLock.RLock()
listeningClustersCancelMap := w.hostAddrClusterCtxMap[hostAddr]
w.hostAddrClusterCtxMapLock.RUnlock()
oldlisteningClusterMap := make(map[string]bool)
for cluster := range listeningClustersCancelMap {
oldlisteningClusterMap[cluster] = false
}
for _, updatedClusterName := range updatedAllVersionedClusterName {
if _, ok := listeningClustersCancelMap[updatedClusterName]; ok {
// already listening
oldlisteningClusterMap[updatedClusterName] = true
continue
}
// new cluster
watcher := ewatcher.NewEndpointWatcherCtxImpl(
updatedClusterName, hostAddr, interfaceName, &w.hostAddrListenerMapLock, w.hostAddrListenerMap)
cancel := w.xdsClient.WatchEndpoints(updatedClusterName, watcher.Handle)
watcher.SetCancelFunction(cancel)
w.hostAddrClusterCtxMapLock.Lock()
w.hostAddrClusterCtxMap[hostAddr][updatedClusterName] = watcher
w.hostAddrClusterCtxMapLock.Unlock()
}
// cancel not exist cluster
for cluster, v := range oldlisteningClusterMap {
if !v {
// this cluster not exist in update cluster list
w.hostAddrClusterCtxMapLock.Lock()
if watchCtx, ok := w.hostAddrClusterCtxMap[hostAddr][cluster]; ok {
delete(w.hostAddrClusterCtxMap[hostAddr], cluster)
watchCtx.Destroy()
}
w.hostAddrClusterCtxMapLock.Unlock()
}
}
})
w.cdsUpdateEventHandlersLock.Unlock()
// update cluster of now
allVersionedClusterName := w.getAllVersionClusterName(hostAddr)
for _, c := range allVersionedClusterName {
watcher := ewatcher.NewEndpointWatcherCtxImpl(
c, hostAddr, interfaceName, &w.hostAddrListenerMapLock, w.hostAddrListenerMap)
watcher.SetCancelFunction(w.xdsClient.WatchEndpoints(c, watcher.Handle))
w.hostAddrClusterCtxMapLock.Lock()
w.hostAddrClusterCtxMap[hostAddr][c] = watcher
w.hostAddrClusterCtxMapLock.Unlock()
}
// 2. cache route config
// todo @(laurnece) cancel watching of this addr's rds
_ = w.xdsClient.WatchRouteConfig(hostAddr, func(update resource.RouteConfigUpdate, err error) {
if update.VirtualHosts == nil {
return
}
w.rdsMapLock.Lock()
defer w.rdsMapLock.Unlock()
w.rdsMap[hostAddr] = update
})
}