in internal/ingress/metric/collectors/socket.go [234:339]
func (sc *SocketCollector) handleMessage(msg []byte) {
klog.V(5).InfoS("Metric", "message", string(msg))
// Unmarshal bytes
var statsBatch []socketData
err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(msg, &statsBatch)
if err != nil {
klog.ErrorS(err, "Unexpected error deserializing JSON", "payload", string(msg))
return
}
for _, stats := range statsBatch {
if sc.metricsPerHost && !sc.hosts.Has(stats.Host) {
klog.V(3).InfoS("Skipping metric for host not being served", "host", stats.Host)
continue
}
// Note these must match the order in requestTags at the top
requestLabels := prometheus.Labels{
"status": stats.Status,
"method": stats.Method,
"path": stats.Path,
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
"canary": stats.Canary,
}
if sc.metricsPerHost {
requestLabels["host"] = stats.Host
}
collectorLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"status": stats.Status,
"service": stats.Service,
"canary": stats.Canary,
}
latencyLabels := prometheus.Labels{
"namespace": stats.Namespace,
"ingress": stats.Ingress,
"service": stats.Service,
"canary": stats.Canary,
}
requestsMetric, err := sc.requests.GetMetricWith(collectorLabels)
if err != nil {
klog.ErrorS(err, "Error fetching requests metric")
} else {
requestsMetric.Inc()
}
if stats.Latency != -1 {
latencyMetric, err := sc.upstreamLatency.GetMetricWith(latencyLabels)
if err != nil {
klog.ErrorS(err, "Error fetching latency metric")
} else {
latencyMetric.Observe(stats.Latency)
}
}
if stats.RequestTime != -1 {
requestTimeMetric, err := sc.requestTime.GetMetricWith(requestLabels)
if err != nil {
klog.ErrorS(err, "Error fetching request duration metric")
} else {
requestTimeMetric.Observe(stats.RequestTime)
}
}
if stats.RequestLength != -1 {
requestLengthMetric, err := sc.requestLength.GetMetricWith(requestLabels)
if err != nil {
klog.ErrorS(err, "Error fetching request length metric")
} else {
requestLengthMetric.Observe(stats.RequestLength)
}
}
if stats.ResponseTime != -1 {
responseTimeMetric, err := sc.responseTime.GetMetricWith(requestLabels)
if err != nil {
klog.ErrorS(err, "Error fetching upstream response time metric")
} else {
responseTimeMetric.Observe(stats.ResponseTime)
}
}
if stats.ResponseLength != -1 {
bytesSentMetric, err := sc.bytesSent.GetMetricWith(requestLabels)
if err != nil {
klog.ErrorS(err, "Error fetching bytes sent metric")
} else {
bytesSentMetric.Observe(stats.ResponseLength)
}
responseSizeMetric, err := sc.responseLength.GetMetricWith(requestLabels)
if err != nil {
klog.ErrorS(err, "Error fetching bytes sent metric")
} else {
responseSizeMetric.Observe(stats.ResponseLength)
}
}
}
}