in pilot/pkg/xds/delta.go [389:506]
func (s *DiscoveryServer) pushDeltaXds(con *Connection,
w *model.WatchedResource, req *model.PushRequest) error {
if w == nil {
return nil
}
gen := s.findGenerator(w.TypeUrl, con)
if gen == nil {
return nil
}
t0 := time.Now()
originalW := w
// If delta is set, client is requesting new resources or removing old ones. We should just generate the
// new resources it needs, rather than the entire set of known resources.
// Note: we do not need to account for unsubscribed resources as these are handled by parent removal;
// See https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#deleting-resources.
// This means if there are only removals, we will not respond.
var logFiltered string
if !req.Delta.IsEmpty() {
logFiltered = " filtered:" + strconv.Itoa(len(w.ResourceNames)-len(req.Delta.Subscribed))
w = &model.WatchedResource{
TypeUrl: w.TypeUrl,
ResourceNames: req.Delta.Subscribed.UnsortedList(),
}
}
var res model.Resources
var deletedRes model.DeletedResources
var logdata model.XdsLogDetails
var usedDelta bool
var err error
switch g := gen.(type) {
case model.XdsDeltaResourceGenerator:
res, deletedRes, logdata, usedDelta, err = g.GenerateDeltas(con.proxy, req, w)
if features.EnableUnsafeDeltaTest {
fullRes, _, _ := g.Generate(con.proxy, originalW, req)
s.compareDiff(con, originalW, fullRes, res, deletedRes, usedDelta, req.Delta)
}
case model.XdsResourceGenerator:
res, logdata, err = g.Generate(con.proxy, w, req)
}
if err != nil || (res == nil && deletedRes == nil) {
// If we have nothing to send, report that we got an ACK for this version.
if s.StatusReporter != nil {
s.StatusReporter.RegisterEvent(con.conID, w.TypeUrl, req.Push.LedgerVersion)
}
return err
}
defer func() { recordPushTime(w.TypeUrl, time.Since(t0)) }()
resp := &discovery.DeltaDiscoveryResponse{
ControlPlane: ControlPlane(),
TypeUrl: w.TypeUrl,
// TODO: send different version for incremental eds
SystemVersionInfo: req.Push.PushVersion,
Nonce: nonce(req.Push.LedgerVersion),
Resources: res,
}
currentResources := extractNames(res)
if usedDelta {
resp.RemovedResources = deletedRes
} else if req.Full {
// similar to sotw
subscribed := sets.New(w.ResourceNames...)
subscribed.DeleteAll(currentResources...)
resp.RemovedResources = subscribed.SortedList()
}
if len(resp.RemovedResources) > 0 {
deltaLog.Debugf("ADS:%v %s REMOVE %v", v3.GetShortType(w.TypeUrl), con.conID, resp.RemovedResources)
}
// normally wildcard xds `subscribe` is always nil, just in case there are some extended type not handled correctly.
if req.Delta.Subscribed == nil && isWildcardTypeURL(w.TypeUrl) {
// this is probably a bad idea...
con.proxy.Lock()
w.ResourceNames = currentResources
con.proxy.Unlock()
}
configSize := ResourceSize(res)
configSizeBytes.With(typeTag.Value(w.TypeUrl)).Record(float64(configSize))
ptype := "PUSH"
info := ""
if logdata.Incremental {
ptype = "PUSH INC"
}
if len(logdata.AdditionalInfo) > 0 {
info = " " + logdata.AdditionalInfo
}
if len(logFiltered) > 0 {
info += logFiltered
}
if err := con.sendDelta(resp); err != nil {
if recordSendError(w.TypeUrl, err) {
deltaLog.Warnf("%s: Send failure for node:%s resources:%d size:%s%s: %v",
v3.GetShortType(w.TypeUrl), con.proxy.ID, len(res), util.ByteCount(configSize), info, err)
}
return err
}
switch {
case logdata.Incremental:
if deltaLog.DebugEnabled() {
deltaLog.Debugf("%s: %s%s for node:%s resources:%d size:%s%s",
v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res), util.ByteCount(configSize), info)
}
default:
debug := ""
if deltaLog.DebugEnabled() {
// Add additional information to logs when debug mode enabled.
debug = " nonce:" + resp.Nonce + " version:" + resp.SystemVersionInfo
}
deltaLog.Infof("%s: %s%s for node:%s resources:%d size:%v%s%s", v3.GetShortType(w.TypeUrl), ptype, req.PushReason(), con.proxy.ID, len(res),
util.ByteCount(ResourceSize(res)), info, debug)
}
return nil
}