in xds/client/pubsub/watch.go [137:244]
func (pb *Pubsub) watch(wi *watchInfo) (first bool, cancel func() bool) {
pb.mu.Lock()
defer pb.mu.Unlock()
pb.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
var (
watchers map[string]map[*watchInfo]bool
mds map[string]resource.UpdateMetadata
)
switch wi.rType {
case resource.ListenerResource:
watchers = pb.ldsWatchers
mds = pb.ldsMD
case resource.RouteConfigResource:
watchers = pb.rdsWatchers
mds = pb.rdsMD
case resource.ClusterResource:
watchers = pb.cdsWatchers
mds = pb.cdsMD
case resource.EndpointsResource:
watchers = pb.edsWatchers
mds = pb.edsMD
default:
pb.logger.Errorf("unknown watch type: %v", wi.rType)
return false, nil
}
var firstWatcher bool
resourceName := wi.target
s, ok := watchers[wi.target]
if !ok {
// If this is a new watcher, will ask lower level to send a new request
// with the resource name.
//
// If this (type+name) is already being watched, will not notify the
// underlying versioned apiClient.
pb.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
s = make(map[*watchInfo]bool)
watchers[resourceName] = s
mds[resourceName] = resource.UpdateMetadata{Status: resource.ServiceStatusRequested}
firstWatcher = true
}
// No matter what, add the new watcher to the set, so it's callback will be
// call for new responses.
s[wi] = true
// If the resource is in cache, call the callback with the value.
switch wi.rType {
case resource.ListenerResource:
if v, ok := pb.ldsCache[resourceName]; ok {
pb.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
case resource.RouteConfigResource:
if v, ok := pb.rdsCache[resourceName]; ok {
pb.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
case resource.ClusterResource:
if v, ok := pb.cdsCache["*"]; ok {
pb.logger.Debugf("CDS resource with name * found in cache: %+v", pretty.ToJSON(v))
wi.newUpdate(v)
}
if v, ok := pb.cdsCache[resourceName]; ok {
pb.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
case resource.EndpointsResource:
if v, ok := pb.edsCache[resourceName]; ok {
pb.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
wi.newUpdate(v)
}
}
return firstWatcher, func() bool {
pb.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
wi.cancel()
pb.mu.Lock()
defer pb.mu.Unlock()
var lastWatcher bool
if s := watchers[resourceName]; s != nil {
// Remove this watcher, so it's callback will not be called in the
// future.
delete(s, wi)
if len(s) == 0 {
pb.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
// If this was the last watcher, also tell xdsv2Client to stop
// watching this resource.
delete(watchers, resourceName)
delete(mds, resourceName)
lastWatcher = true
// Remove the resource from cache. When a watch for this
// resource is added later, it will trigger a xDS request with
// resource names, and client will receive new xDS responses.
switch wi.rType {
case resource.ListenerResource:
delete(pb.ldsCache, resourceName)
case resource.RouteConfigResource:
delete(pb.rdsCache, resourceName)
case resource.ClusterResource:
delete(pb.cdsCache, resourceName)
case resource.EndpointsResource:
delete(pb.edsCache, resourceName)
}
}
}
return lastWatcher
}
}