in pkg/xds/server/callbacks/dataplane_status_tracker.go [148:231]
func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.DiscoveryRequest) error {
c.mu.RLock() // read access to the map of all ADS streams
defer c.mu.RUnlock()
state := c.streams[streamID]
state.mu.Lock() // write access to the per Dataplane info
defer state.mu.Unlock()
if state.dataplaneId == (core_model.ResourceKey{}) {
// Infer the Dataplane ID.
if proxyId, err := core_xds.ParseProxyIdFromString(req.NodeId()); err == nil {
state.dataplaneId = proxyId.ToResourceKey()
var dpType core_model.ResourceType
md := core_xds.DataplaneMetadataFromXdsMetadata(req.Metadata(), os.TempDir(), state.dataplaneId)
// If the dataplane was started with a resource YAML, then it
// will be serialized in the node metadata and we would know
// the underlying type directly. Since that is optional, we
// can't depend on it here, so we map from the proxy type,
// which is guaranteed.
switch md.GetProxyType() {
case mesh_proto.IngressProxyType:
dpType = core_mesh.ZoneIngressType
case mesh_proto.DataplaneProxyType:
dpType = core_mesh.DataplaneType
}
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", md.GetProxyType(),
"subscriptionID", state.subscription.Id,
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues("node", req.Node())
}
log.Info("proxy connected")
statusTrackerLog.Error(err, "failed to extract version out of the Envoy metadata", "streamid", streamID, "metadata", req.Metadata())
// Kick off the async Dataplane status flusher.
go c.createStatusSink(dpType, state).Start(state.stop)
} else {
statusTrackerLog.Error(err, "failed to parse Dataplane Id out of DiscoveryRequest", "streamid", streamID, "req", req)
}
}
subscription := state.subscription
log := statusTrackerLog.WithValues(
"proxyName", state.dataplaneId.Name,
"mesh", state.dataplaneId.Mesh,
"streamID", streamID,
"type", shortEnvoyType(req.GetTypeUrl()),
"resourceVersion", req.VersionInfo(),
)
if statusTrackerLog.V(1).Enabled() {
log = log.WithValues(
"resourceNames", req.GetResourceNames(),
"subscriptionID", subscription.Id,
"nonce", req.GetResponseNonce(),
)
}
// update Dataplane status
if req.GetResponseNonce() != "" {
subscription.Status.LastUpdateTime = util_proto.MustTimestampProto(core.Now())
if req.HasErrors() {
log.Info("config rejected")
subscription.Status.Total.ResponsesRejected++
subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesRejected++
} else {
log.V(1).Info("config accepted")
subscription.Status.Total.ResponsesAcknowledged++
subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesAcknowledged++
}
} else {
if !statusTrackerLog.V(1).Enabled() { // it was already added, no need to add it twice
log = log.WithValues("resourceNames", req.GetResourceNames())
}
log.Info("config requested")
}
return nil
}