func()

in pkg/xds/server/callbacks/dataplane_status_tracker.go [148:231]


func (c *dataplaneStatusTracker) OnStreamRequest(streamID int64, req util_xds.DiscoveryRequest) error {
	c.mu.RLock() // read access to the map of all ADS streams
	defer c.mu.RUnlock()

	state := c.streams[streamID]

	state.mu.Lock() // write access to the per Dataplane info
	defer state.mu.Unlock()

	if state.dataplaneId == (core_model.ResourceKey{}) {
		// Infer the Dataplane ID.
		if proxyId, err := core_xds.ParseProxyIdFromString(req.NodeId()); err == nil {
			state.dataplaneId = proxyId.ToResourceKey()
			var dpType core_model.ResourceType
			md := core_xds.DataplaneMetadataFromXdsMetadata(req.Metadata(), os.TempDir(), state.dataplaneId)

			// If the dataplane was started with a resource YAML, then it
			// will be serialized in the node metadata and we would know
			// the underlying type directly. Since that is optional, we
			// can't depend on it here, so we map from the proxy type,
			// which is guaranteed.
			switch md.GetProxyType() {
			case mesh_proto.IngressProxyType:
				dpType = core_mesh.ZoneIngressType
			case mesh_proto.DataplaneProxyType:
				dpType = core_mesh.DataplaneType
			}

			log := statusTrackerLog.WithValues(
				"proxyName", state.dataplaneId.Name,
				"mesh", state.dataplaneId.Mesh,
				"streamID", streamID,
				"type", md.GetProxyType(),
				"subscriptionID", state.subscription.Id,
			)
			if statusTrackerLog.V(1).Enabled() {
				log = log.WithValues("node", req.Node())
			}
			log.Info("proxy connected")

			statusTrackerLog.Error(err, "failed to extract version out of the Envoy metadata", "streamid", streamID, "metadata", req.Metadata())
			// Kick off the async Dataplane status flusher.
			go c.createStatusSink(dpType, state).Start(state.stop)
		} else {
			statusTrackerLog.Error(err, "failed to parse Dataplane Id out of DiscoveryRequest", "streamid", streamID, "req", req)
		}
	}

	subscription := state.subscription
	log := statusTrackerLog.WithValues(
		"proxyName", state.dataplaneId.Name,
		"mesh", state.dataplaneId.Mesh,
		"streamID", streamID,
		"type", shortEnvoyType(req.GetTypeUrl()),
		"resourceVersion", req.VersionInfo(),
	)
	if statusTrackerLog.V(1).Enabled() {
		log = log.WithValues(
			"resourceNames", req.GetResourceNames(),
			"subscriptionID", subscription.Id,
			"nonce", req.GetResponseNonce(),
		)
	}

	// update Dataplane status
	if req.GetResponseNonce() != "" {
		subscription.Status.LastUpdateTime = util_proto.MustTimestampProto(core.Now())
		if req.HasErrors() {
			log.Info("config rejected")
			subscription.Status.Total.ResponsesRejected++
			subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesRejected++
		} else {
			log.V(1).Info("config accepted")
			subscription.Status.Total.ResponsesAcknowledged++
			subscription.Status.StatsOf(req.GetTypeUrl()).ResponsesAcknowledged++
		}
	} else {
		if !statusTrackerLog.V(1).Enabled() { // it was already added, no need to add it twice
			log = log.WithValues("resourceNames", req.GetResourceNames())
		}
		log.Info("config requested")
	}
	return nil
}