func()

in xds/client/pubsub/watch.go [137:244]


func (pb *Pubsub) watch(wi *watchInfo) (first bool, cancel func() bool) {
	pb.mu.Lock()
	defer pb.mu.Unlock()
	pb.logger.Debugf("new watch for type %v, resource name %v", wi.rType, wi.target)
	var (
		watchers map[string]map[*watchInfo]bool
		mds      map[string]resource.UpdateMetadata
	)
	switch wi.rType {
	case resource.ListenerResource:
		watchers = pb.ldsWatchers
		mds = pb.ldsMD
	case resource.RouteConfigResource:
		watchers = pb.rdsWatchers
		mds = pb.rdsMD
	case resource.ClusterResource:
		watchers = pb.cdsWatchers
		mds = pb.cdsMD
	case resource.EndpointsResource:
		watchers = pb.edsWatchers
		mds = pb.edsMD
	default:
		pb.logger.Errorf("unknown watch type: %v", wi.rType)
		return false, nil
	}

	var firstWatcher bool
	resourceName := wi.target
	s, ok := watchers[wi.target]
	if !ok {
		// If this is a new watcher, will ask lower level to send a new request
		// with the resource name.
		//
		// If this (type+name) is already being watched, will not notify the
		// underlying versioned apiClient.
		pb.logger.Debugf("first watch for type %v, resource name %v, will send a new xDS request", wi.rType, wi.target)
		s = make(map[*watchInfo]bool)
		watchers[resourceName] = s
		mds[resourceName] = resource.UpdateMetadata{Status: resource.ServiceStatusRequested}
		firstWatcher = true
	}
	// No matter what, add the new watcher to the set, so it's callback will be
	// call for new responses.
	s[wi] = true

	// If the resource is in cache, call the callback with the value.
	switch wi.rType {
	case resource.ListenerResource:
		if v, ok := pb.ldsCache[resourceName]; ok {
			pb.logger.Debugf("LDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
			wi.newUpdate(v)
		}
	case resource.RouteConfigResource:
		if v, ok := pb.rdsCache[resourceName]; ok {
			pb.logger.Debugf("RDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
			wi.newUpdate(v)
		}
	case resource.ClusterResource:
		if v, ok := pb.cdsCache["*"]; ok {
			pb.logger.Debugf("CDS resource with name * found in cache: %+v", pretty.ToJSON(v))
			wi.newUpdate(v)
		}
		if v, ok := pb.cdsCache[resourceName]; ok {
			pb.logger.Debugf("CDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
			wi.newUpdate(v)
		}
	case resource.EndpointsResource:
		if v, ok := pb.edsCache[resourceName]; ok {
			pb.logger.Debugf("EDS resource with name %v found in cache: %+v", wi.target, pretty.ToJSON(v))
			wi.newUpdate(v)
		}
	}

	return firstWatcher, func() bool {
		pb.logger.Debugf("watch for type %v, resource name %v canceled", wi.rType, wi.target)
		wi.cancel()
		pb.mu.Lock()
		defer pb.mu.Unlock()
		var lastWatcher bool
		if s := watchers[resourceName]; s != nil {
			// Remove this watcher, so it's callback will not be called in the
			// future.
			delete(s, wi)
			if len(s) == 0 {
				pb.logger.Debugf("last watch for type %v, resource name %v canceled, will send a new xDS request", wi.rType, wi.target)
				// If this was the last watcher, also tell xdsv2Client to stop
				// watching this resource.
				delete(watchers, resourceName)
				delete(mds, resourceName)
				lastWatcher = true
				// Remove the resource from cache. When a watch for this
				// resource is added later, it will trigger a xDS request with
				// resource names, and client will receive new xDS responses.
				switch wi.rType {
				case resource.ListenerResource:
					delete(pb.ldsCache, resourceName)
				case resource.RouteConfigResource:
					delete(pb.rdsCache, resourceName)
				case resource.ClusterResource:
					delete(pb.cdsCache, resourceName)
				case resource.EndpointsResource:
					delete(pb.edsCache, resourceName)
				}
			}
		}
		return lastWatcher
	}
}