in pkg/profiling/task/network/analyze/layer4/listener.go [93:161]
func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
// rebuild to the map for helping quick search correlate ConnectionContext
keyWithContext := make(map[string]*base.ConnectionContext)
for _, cc := range ccs {
// ready to flush histograms
connection := cc.Connection
layer4 := l.getMetrics(connection.Metrics)
// basic counter update
activeConnection := cc.ActiveInBPF
if activeConnection != nil {
layer4.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
layer4.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
layer4.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
}
// build cache
keyWithContext[l.generateConID(connection.ConnectionID, connection.RandomID)] = connection
if log.Enable(logrus.DebugLevel) {
log.Debugf("found connection: %d_%d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, "+
"is_closed: %t, write: %d bytes/%d, read: %d bytes/%d",
connection.ConnectionID, connection.RandomID, connection.Role.String(),
connection.LocalIP, connection.LocalPort, connection.LocalPid, connection.RemoteIP, connection.RemotePort,
enums.ConnectionProtocolString(connection.Protocol), connection.IsSSL, connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
layer4.WriteCounter.Cur.Count, layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
}
}
var key HistogramDataKey
var count uint32
histogramIt := bpfLoader.SocketConnectionStatsHistogram.Iterate()
// for-each the stats map
for histogramIt.Next(&key, &count) {
// if it's not relate to the ConnectionContext just ignore
cc := keyWithContext[l.generateConID(key.ConnectionID, key.RandomID)]
if cc == nil {
continue
}
layer4 := l.getMetrics(cc.Metrics)
// add the histogram data
var histogram *SocketDataHistogramWithHistory
if key.DataDirection == enums.SocketDataDirectionEgress {
if key.DataType == enums.SocketDataStaticsTypeExeTime {
histogram = layer4.WriteExeTimeHistogram
} else if key.DataType == enums.SocketDataStaticsTypeRTT {
histogram = layer4.WriteRTTHistogram
}
} else if key.DataDirection == enums.SocketDataDirectionIngress {
histogram = layer4.ReadExeTimeHistogram
}
if histogram == nil {
log.Warnf("unknown the histogram data: %v", cc)
continue
}
histogram.UpdateToCurrent(key.Bucket, count)
// delete the stats if the connection already closed
if cc.ConnectionClosed {
if err := bpfLoader.SocketConnectionStatsHistogram.Delete(key); err != nil {
log.Warnf("delete the connection stats failure: %v", err)
}
}
}
// all the exception operations to the common
exceptionContexts := l.cleanAndGetAllExceptionContexts()
l.combineExceptionToConnections(keyWithContext, exceptionContexts)
return nil
}