func()

in xds/client/pubsub/update.go [196:277]


func (pb *Pubsub) NewClusters(updates map[string]resource.ClusterUpdateErrTuple, metadata resource.UpdateMetadata) {
	pb.mu.Lock()
	defer pb.mu.Unlock()

	for k, update := range pb.cdsCache {
		if _, ok := updates[k]; !ok {
			// this is a delete event
			s, ok := pb.cdsWatchers[k]
			if !ok {
				s, ok = pb.cdsWatchers["*"]
			}
			if ok {
				for wi := range s {
					// delete
					update.ClusterName = "-" + update.ClusterName
					wi.newUpdate(update)
				}
			}
		}
	}

	for name, uErr := range updates {
		s, ok := pb.cdsWatchers[name]
		if !ok {
			s, ok = pb.cdsWatchers["*"]
		}
		if ok {
			if uErr.Err != nil {
				// On error, keep previous version for each resource. But update
				// status and error.
				mdCopy := pb.cdsMD[name]
				mdCopy.ErrState = metadata.ErrState
				mdCopy.Status = metadata.Status
				pb.cdsMD[name] = mdCopy
				for wi := range s {
					// Send the watcher the individual error, instead of the
					// overall combined error from the metadata.ErrState.
					wi.newError(uErr.Err)
				}
				continue
			}
			// If we get here, it means that the update is a valid one. Notify
			// watchers only if this is a first time update or it is different
			// from the one currently cached.
			if cur, ok := pb.cdsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
				for wi := range s {
					wi.newUpdate(uErr.Update)
				}
			}
			// Sync cache.
			pb.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
			pb.cdsCache[name] = uErr.Update
			// Set status to ACK, and clear error state. The metadata might be a
			// NACK metadata because some other resources in the same response
			// are invalid.
			mdCopy := metadata
			mdCopy.Status = resource.ServiceStatusACKed
			mdCopy.ErrState = nil
			if metadata.ErrState != nil {
				mdCopy.Version = metadata.ErrState.Version
			}
			pb.cdsMD[name] = mdCopy
		}
	}
	// Resources not in the new update were removed by the server, so delete
	// them.
	for name := range pb.cdsCache {
		if _, ok := updates[name]; !ok {
			// If resource exists in cache, but not in the new update, delete it
			// from cache, and also send an resource not found error to indicate
			// resource removed.
			delete(pb.cdsCache, name)
			pb.ldsMD[name] = resource.UpdateMetadata{Status: resource.ServiceStatusNotExist}
			for wi := range pb.cdsWatchers[name] {
				wi.resourceNotFound()
			}
		}
	}
	// When CDS resource is removed, we don't delete corresponding EDS cached
	// data. The EDS watch will be canceled, and cache entry is removed when the
	// last watch is canceled.
}