in backends-velox/src/main/scala/org/apache/gluten/metrics/MetricsUtil.scala [86:202]
private def mergeMetrics(operatorMetrics: JList[OperatorMetrics]): OperatorMetrics = {
if (operatorMetrics.size() == 0) {
return null
}
// We are accessing the metrics from end to start. So the input metrics are got from the
// last suite of metrics, and the output metrics are got from the first suite.
val inputRows = operatorMetrics.get(operatorMetrics.size() - 1).inputRows
val inputVectors = operatorMetrics.get(operatorMetrics.size() - 1).inputVectors
val inputBytes = operatorMetrics.get(operatorMetrics.size() - 1).inputBytes
val rawInputRows = operatorMetrics.get(operatorMetrics.size() - 1).rawInputRows
val rawInputBytes = operatorMetrics.get(operatorMetrics.size() - 1).rawInputBytes
val outputRows = operatorMetrics.get(0).outputRows
val outputVectors = operatorMetrics.get(0).outputVectors
val outputBytes = operatorMetrics.get(0).outputBytes
val physicalWrittenBytes = operatorMetrics.get(0).physicalWrittenBytes
val writeIOTime = operatorMetrics.get(0).writeIOTime
var cpuCount: Long = 0
var wallNanos: Long = 0
var peakMemoryBytes: Long = 0
var numMemoryAllocations: Long = 0
var spilledInputBytes: Long = 0
var spilledBytes: Long = 0
var spilledRows: Long = 0
var spilledPartitions: Long = 0
var spilledFiles: Long = 0
var numDynamicFiltersProduced: Long = 0
var numDynamicFiltersAccepted: Long = 0
var numReplacedWithDynamicFilterRows: Long = 0
var flushRowCount: Long = 0
var loadedToValueHook: Long = 0
var scanTime: Long = 0
var skippedSplits: Long = 0
var processedSplits: Long = 0
var skippedStrides: Long = 0
var processedStrides: Long = 0
var remainingFilterTime: Long = 0
var ioWaitTime: Long = 0
var storageReadBytes: Long = 0
var localReadBytes: Long = 0
var ramReadBytes: Long = 0
var preloadSplits: Long = 0
var numWrittenFiles: Long = 0
val metricsIterator = operatorMetrics.iterator()
while (metricsIterator.hasNext) {
val metrics = metricsIterator.next()
cpuCount += metrics.cpuCount
wallNanos += metrics.wallNanos
peakMemoryBytes = peakMemoryBytes.max(metrics.peakMemoryBytes)
numMemoryAllocations += metrics.numMemoryAllocations
spilledInputBytes += metrics.spilledInputBytes
spilledBytes += metrics.spilledBytes
spilledRows += metrics.spilledRows
spilledPartitions += metrics.spilledPartitions
spilledFiles += metrics.spilledFiles
numDynamicFiltersProduced += metrics.numDynamicFiltersProduced
numDynamicFiltersAccepted += metrics.numDynamicFiltersAccepted
numReplacedWithDynamicFilterRows += metrics.numReplacedWithDynamicFilterRows
flushRowCount += metrics.flushRowCount
loadedToValueHook += metrics.loadedToValueHook
scanTime += metrics.scanTime
skippedSplits += metrics.skippedSplits
processedSplits += metrics.processedSplits
skippedStrides += metrics.skippedStrides
processedStrides += metrics.processedStrides
remainingFilterTime += metrics.remainingFilterTime
ioWaitTime += metrics.ioWaitTime
storageReadBytes += metrics.storageReadBytes
localReadBytes += metrics.localReadBytes
ramReadBytes += metrics.ramReadBytes
preloadSplits += metrics.preloadSplits
numWrittenFiles += metrics.numWrittenFiles
}
new OperatorMetrics(
inputRows,
inputVectors,
inputBytes,
rawInputRows,
rawInputBytes,
outputRows,
outputVectors,
outputBytes,
cpuCount,
wallNanos,
peakMemoryBytes,
numMemoryAllocations,
spilledInputBytes,
spilledBytes,
spilledRows,
spilledPartitions,
spilledFiles,
numDynamicFiltersProduced,
numDynamicFiltersAccepted,
numReplacedWithDynamicFilterRows,
flushRowCount,
loadedToValueHook,
scanTime,
skippedSplits,
processedSplits,
skippedStrides,
processedStrides,
remainingFilterTime,
ioWaitTime,
storageReadBytes,
localReadBytes,
ramReadBytes,
preloadSplits,
physicalWrittenBytes,
writeIOTime,
numWrittenFiles
)
}