func()

in internal/ingress/metric/collectors/socket.go [234:339]


func (sc *SocketCollector) handleMessage(msg []byte) {
	klog.V(5).InfoS("Metric", "message", string(msg))

	// Unmarshal bytes
	var statsBatch []socketData
	err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(msg, &statsBatch)
	if err != nil {
		klog.ErrorS(err, "Unexpected error deserializing JSON", "payload", string(msg))
		return
	}

	for _, stats := range statsBatch {
		if sc.metricsPerHost && !sc.hosts.Has(stats.Host) {
			klog.V(3).InfoS("Skipping metric for host not being served", "host", stats.Host)
			continue
		}

		// Note these must match the order in requestTags at the top
		requestLabels := prometheus.Labels{
			"status":    stats.Status,
			"method":    stats.Method,
			"path":      stats.Path,
			"namespace": stats.Namespace,
			"ingress":   stats.Ingress,
			"service":   stats.Service,
			"canary":    stats.Canary,
		}
		if sc.metricsPerHost {
			requestLabels["host"] = stats.Host
		}

		collectorLabels := prometheus.Labels{
			"namespace": stats.Namespace,
			"ingress":   stats.Ingress,
			"status":    stats.Status,
			"service":   stats.Service,
			"canary":    stats.Canary,
		}

		latencyLabels := prometheus.Labels{
			"namespace": stats.Namespace,
			"ingress":   stats.Ingress,
			"service":   stats.Service,
			"canary":    stats.Canary,
		}

		requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
		if err != nil {
			klog.ErrorS(err, "Error fetching requests metric")
		} else {
			requestsMetric.Inc()
		}

		if stats.Latency != -1 {
			latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
			if err != nil {
				klog.ErrorS(err, "Error fetching latency metric")
			} else {
				latencyMetric.Observe(stats.Latency)
			}
		}

		if stats.RequestTime != -1 {
			requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
			if err != nil {
				klog.ErrorS(err, "Error fetching request duration metric")
			} else {
				requestTimeMetric.Observe(stats.RequestTime)
			}
		}

		if stats.RequestLength != -1 {
			requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
			if err != nil {
				klog.ErrorS(err, "Error fetching request length metric")
			} else {
				requestLengthMetric.Observe(stats.RequestLength)
			}
		}

		if stats.ResponseTime != -1 {
			responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
			if err != nil {
				klog.ErrorS(err, "Error fetching upstream response time metric")
			} else {
				responseTimeMetric.Observe(stats.ResponseTime)
			}
		}

		if stats.ResponseLength != -1 {
			bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
			if err != nil {
				klog.ErrorS(err, "Error fetching bytes sent metric")
			} else {
				bytesSentMetric.Observe(stats.ResponseLength)
			}

			responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
			if err != nil {
				klog.ErrorS(err, "Error fetching bytes sent metric")
			} else {
				responseSizeMetric.Observe(stats.ResponseLength)
			}
		}
	}
}