in oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/AccessLogServiceHandler.java [198:310]
protected void dispatchKernelLog(NodeInfo node, ConnectionInfo connection, AccessLogKernelLog kernelLog) {
final List<K8SMetrics> metrics = buildKernelLogMetrics(node, connection, kernelLog)
.stream().filter(Objects::nonNull).collect(Collectors.toList());
for (K8SMetrics metric : metrics) {
switch (kernelLog.getOperationCase()) {
case CONNECT:
final AccessLogKernelConnectOperation connect = kernelLog.getConnect();
metric.setTimeBucket(node.parseMinuteTimeBucket(connect.getStartTime()));
metric.setType(K8SMetrics.TYPE_CONNECT);
metric.setConnect(new K8SMetrics.Connect());
metric.getConnect().setDuration(getDurationFromTimestamp(node, connect.getStartTime(), connect.getEndTime()));
metric.getConnect().setSuccess(connect.getSuccess());
break;
case ACCEPT:
final AccessLogKernelAcceptOperation accept = kernelLog.getAccept();
metric.setTimeBucket(node.parseMinuteTimeBucket(accept.getStartTime()));
metric.setType(K8SMetrics.TYPE_ACCEPT);
metric.setAccept(new K8SMetrics.Accept());
metric.getAccept().setDuration(getDurationFromTimestamp(node, accept.getStartTime(), accept.getEndTime()));
break;
case CLOSE:
final AccessLogKernelCloseOperation close = kernelLog.getClose();
metric.setTimeBucket(node.parseMinuteTimeBucket(close.getStartTime()));
metric.setType(K8SMetrics.TYPE_CLOSE);
metric.setClose(new K8SMetrics.Close());
metric.getClose().setDuration(getDurationFromTimestamp(node, close.getStartTime(), close.getEndTime()));
metric.getClose().setSuccess(close.getSuccess());
break;
case READ:
final AccessLogKernelReadOperation read = kernelLog.getRead();
metric.setTimeBucket(node.parseMinuteTimeBucket(read.getStartTime()));
metric.setType(K8SMetrics.TYPE_READ);
metric.setRead(new K8SMetrics.Read());
metric.getRead().setDuration(getDurationFromTimestamp(node, read.getStartTime(), read.getEndTime()));
metric.getRead().setSyscall(read.getSyscall().name());
// l4
final K8SMetrics.ReadL4 readL4 = new K8SMetrics.ReadL4();
metric.getRead().setL4(readL4);
readL4.setDuration(read.getL4Metrics().getTotalDuration());
// l3
final K8SMetrics.ReadL3 readL3 = new K8SMetrics.ReadL3();
metric.getRead().setL3(readL3);
readL3.setDuration(read.getL3Metrics().getTotalDuration());
readL3.setRcvDuration(read.getL3Metrics().getTotalRecvDuration());
readL3.setLocalDuration(read.getL3Metrics().getTotalLocalDuration());
final long totalWriteNetFilterCount = read.getL3Metrics().getTotalNetFilterCount();
if (totalWriteNetFilterCount > 0) {
readL3.setNetFilterCount(totalWriteNetFilterCount);
readL3.setNetFilterDuration(read.getL3Metrics().getTotalNetFilterDuration());
}
// l2
final K8SMetrics.ReadL2 readL2 = new K8SMetrics.ReadL2();
metric.getRead().setL2(readL2);
readL2.setNetDeviceName(node.getNetInterfaceName(read.getL2Metrics().getIfindex()));
readL2.setPackageCount(read.getL2Metrics().getTotalPackageCount());
readL2.setTotalPackageSize(read.getL2Metrics().getTotalPackageSize());
readL2.setPackageToQueueDuration(read.getL2Metrics().getTotalPackageToQueueDuration());
readL2.setRcvPackageFromQueueDuration(read.getL2Metrics().getTotalRcvPackageFromQueueDuration());
break;
case WRITE:
final AccessLogKernelWriteOperation write = kernelLog.getWrite();
metric.setTimeBucket(node.parseMinuteTimeBucket(write.getStartTime()));
metric.setType(K8SMetrics.TYPE_WRITE);
metric.setWrite(new K8SMetrics.Write());
metric.getWrite().setDuration(getDurationFromTimestamp(node, write.getStartTime(), write.getEndTime()));
metric.getWrite().setSyscall(write.getSyscall().name());
// l4
final K8SMetrics.WriteL4 writeL4 = new K8SMetrics.WriteL4();
metric.getWrite().setL4(writeL4);
writeL4.setDuration(write.getL4Metrics().getTotalDuration());
writeL4.setTransmitPackageCount(write.getL4Metrics().getTotalTransmitPackageCount());
writeL4.setRetransmitPackageCount(write.getL4Metrics().getTotalRetransmitPackageCount());
writeL4.setTotalPackageSize(write.getL4Metrics().getTotalPackageSize());
// l3
final K8SMetrics.WriteL3 writeL3 = new K8SMetrics.WriteL3();
metric.getWrite().setL3(writeL3);
writeL3.setDuration(write.getL3Metrics().getTotalDuration());
writeL3.setLocalDuration(write.getL3Metrics().getTotalLocalDuration());
writeL3.setOutputDuration(write.getL3Metrics().getTotalOutputDuration());
final long totalResolveMACCount = write.getL3Metrics().getTotalResolveMACCount();
if (totalResolveMACCount > 0) {
writeL3.setResolveMACCount(totalResolveMACCount);
writeL3.setResolveMACDuration(write.getL3Metrics().getTotalResolveMACDuration());
}
final long totalReadNetFilterCount = write.getL3Metrics().getTotalNetFilterCount();
if (totalReadNetFilterCount > 0) {
writeL3.setNetFilterCount(totalReadNetFilterCount);
writeL3.setNetFilterDuration(write.getL3Metrics().getTotalNetFilterDuration());
}
// l2
final K8SMetrics.WriteL2 writeL2 = new K8SMetrics.WriteL2();
metric.getWrite().setL2(writeL2);
writeL2.setDuration(write.getL2Metrics().getTotalDuration());
writeL2.setNetworkDeviceName(node.getNetInterfaceName(write.getL2Metrics().getIfindex()));
final long totalEnterQueueBufferCount = write.getL2Metrics().getTotalEnterQueueBufferCount();
writeL2.setEnterQueueBufferCount(totalEnterQueueBufferCount);
writeL2.setReadySendDuration(write.getL2Metrics().getTotalReadySendDuration());
writeL2.setNetworkDeviceSendDuration(write.getL2Metrics().getTotalNetDeviceSendDuration());
break;
}
// send the metrics
sourceReceiver.receive(metric);
}
}