in xds/client/pubsub/update.go [196:277]
func (pb *Pubsub) NewClusters(updates map[string]resource.ClusterUpdateErrTuple, metadata resource.UpdateMetadata) {
pb.mu.Lock()
defer pb.mu.Unlock()
for k, update := range pb.cdsCache {
if _, ok := updates[k]; !ok {
// this is a delete event
s, ok := pb.cdsWatchers[k]
if !ok {
s, ok = pb.cdsWatchers["*"]
}
if ok {
for wi := range s {
// delete
update.ClusterName = "-" + update.ClusterName
wi.newUpdate(update)
}
}
}
}
for name, uErr := range updates {
s, ok := pb.cdsWatchers[name]
if !ok {
s, ok = pb.cdsWatchers["*"]
}
if ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := pb.cdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
pb.cdsMD[name] = mdCopy
for wi := range s {
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := pb.cdsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
pb.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
pb.cdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = resource.ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
pb.cdsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
// them.
for name := range pb.cdsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(pb.cdsCache, name)
pb.ldsMD[name] = resource.UpdateMetadata{Status: resource.ServiceStatusNotExist}
for wi := range pb.cdsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When CDS resource is removed, we don't delete corresponding EDS cached
// data. The EDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}