func()

in pkg/profiling/task/network/analyze/layer4/listener.go [93:161]


func (l *Listener) PreFlushConnectionMetrics(ccs []*base.ConnectionWithBPF, bpfLoader *bpf.Loader) error {
	// rebuild to the map for helping quick search correlate ConnectionContext
	keyWithContext := make(map[string]*base.ConnectionContext)
	for _, cc := range ccs {
		// ready to flush histograms
		connection := cc.Connection
		layer4 := l.getMetrics(connection.Metrics)
		// basic counter update
		activeConnection := cc.ActiveInBPF
		if activeConnection != nil {
			layer4.WriteCounter.UpdateToCurrent(activeConnection.WriteBytes, activeConnection.WriteCount, activeConnection.WriteExeTime)
			layer4.ReadCounter.UpdateToCurrent(activeConnection.ReadBytes, activeConnection.ReadCount, activeConnection.ReadExeTime)
			layer4.WriteRTTCounter.UpdateToCurrent(0, activeConnection.WriteRTTCount, activeConnection.WriteRTTExeTime)
		}
		// build cache
		keyWithContext[l.generateConID(connection.ConnectionID, connection.RandomID)] = connection

		if log.Enable(logrus.DebugLevel) {
			log.Debugf("found connection: %d_%d, %s relation: %s:%d(%d) -> %s:%d, protocol: %s, is_ssl: %t, "+
				"is_closed: %t, write: %d bytes/%d, read: %d bytes/%d",
				connection.ConnectionID, connection.RandomID, connection.Role.String(),
				connection.LocalIP, connection.LocalPort, connection.LocalPid, connection.RemoteIP, connection.RemotePort,
				enums.ConnectionProtocolString(connection.Protocol), connection.IsSSL, connection.ConnectionClosed, layer4.WriteCounter.Cur.Bytes,
				layer4.WriteCounter.Cur.Count, layer4.ReadCounter.Cur.Bytes, layer4.ReadCounter.Cur.Count)
		}
	}

	var key HistogramDataKey
	var count uint32
	histogramIt := bpfLoader.SocketConnectionStatsHistogram.Iterate()
	// for-each the stats map
	for histogramIt.Next(&key, &count) {
		// if it's not relate to the ConnectionContext just ignore
		cc := keyWithContext[l.generateConID(key.ConnectionID, key.RandomID)]
		if cc == nil {
			continue
		}
		layer4 := l.getMetrics(cc.Metrics)

		// add the histogram data
		var histogram *SocketDataHistogramWithHistory
		if key.DataDirection == enums.SocketDataDirectionEgress {
			if key.DataType == enums.SocketDataStaticsTypeExeTime {
				histogram = layer4.WriteExeTimeHistogram
			} else if key.DataType == enums.SocketDataStaticsTypeRTT {
				histogram = layer4.WriteRTTHistogram
			}
		} else if key.DataDirection == enums.SocketDataDirectionIngress {
			histogram = layer4.ReadExeTimeHistogram
		}
		if histogram == nil {
			log.Warnf("unknown the histogram data: %v", cc)
			continue
		}
		histogram.UpdateToCurrent(key.Bucket, count)

		// delete the stats if the connection already closed
		if cc.ConnectionClosed {
			if err := bpfLoader.SocketConnectionStatsHistogram.Delete(key); err != nil {
				log.Warnf("delete the connection stats failure: %v", err)
			}
		}
	}

	// all the exception operations to the common
	exceptionContexts := l.cleanAndGetAllExceptionContexts()
	l.combineExceptionToConnections(keyWithContext, exceptionContexts)
	return nil
}