in backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala [75:327]
override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
"inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of batch scan"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "scan time"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
sparkContext,
"number of dynamic filters accepted"),
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time"),
"ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
)
override def genBatchScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new BatchScanMetricsUpdater(metrics)
override def genHiveTableScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan and filter"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
"metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
"filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
"pruningTime" ->
SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
sparkContext,
"number of dynamic filters accepted"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time"),
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
)
override def genHiveTableScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new HiveTableScanMetricsUpdater(metrics)
override def genFileSourceScanTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
"rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan and filter"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
"metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
"filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
"pruningTime" ->
SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
sparkContext,
"number of dynamic filters accepted"),
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
"processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
"preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
"skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
"processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
"remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"remaining filter time"),
"ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
"storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
"localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
"ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
)
override def genFileSourceScanTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)
override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of filter"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
)
override def genFilterTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new FilterMetricsUpdater(metrics, extraMetrics)
override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of project"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
)
override def genProjectTransformerMetricsUpdater(
metrics: Map[String, SQLMetric],
extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
new ProjectMetricsUpdater(metrics, extraMetrics)
override def genHashAggregateTransformerMetrics(
sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"aggOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"aggOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"aggOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"aggCpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"aggWallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of aggregation"),
"aggPeakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"aggNumMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations"),
"aggSpilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of spilled bytes"),
"aggSpilledRows" -> SQLMetrics.createMetric(sparkContext, "number of spilled rows"),
"aggSpilledPartitions" -> SQLMetrics.createMetric(
sparkContext,
"number of spilled partitions"),
"aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of spilled files"),
"flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of flushed rows"),
"loadedToValueHook" -> SQLMetrics.createMetric(
sparkContext,
"number of pushdown aggregations"),
"rowConstructionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"rowConstruction cpu wall time count"),
"rowConstructionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of rowConstruction"),
"extractionCpuCount" -> SQLMetrics.createMetric(
sparkContext,
"extraction cpu wall time count"),
"extractionWallNanos" -> SQLMetrics.createNanoTimingMetric(
sparkContext,
"time of extraction"),
"finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"),
"finalOutputVectors" -> SQLMetrics.createMetric(
sparkContext,
"number of final output vectors")
)
override def genHashAggregateTransformerMetricsUpdater(
metrics: Map[String, SQLMetric]): MetricsUpdater =
new HashAggregateMetricsUpdaterImpl(metrics)
override def genExpandTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
"outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
"wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of expand"),
"cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
"peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
"numMemoryAllocations" -> SQLMetrics.createMetric(
sparkContext,
"number of memory allocations")
)
override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
new ExpandMetricsUpdater(metrics)
override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
override def genColumnarShuffleExchangeMetrics(
sparkContext: SparkContext,
isSort: Boolean): Map[String, SQLMetric] = {
val baseMetrics = Map(
"numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"),
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"bytesSpilled" -> SQLMetrics.createSizeMetric(sparkContext, "shuffle bytes spilled"),
"avgReadBatchNumRows" -> SQLMetrics
.createAverageMetric(sparkContext, "avg read batch num rows"),
"numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics
.createMetric(sparkContext, "number of output rows"),
"inputBatches" -> SQLMetrics
.createMetric(sparkContext, "number of input batches"),
"spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to spill"),
"compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to compress"),
"decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to decompress"),
"deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to deserialize"),
"shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle wall time"),
// For hash shuffle writer, the peak bytes represents the maximum split buffer size.
// For sort shuffle writer, the peak bytes represents the maximum
// row buffer + sort buffer size.
"peakBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak bytes allocated")
)
if (isSort) {
baseMetrics ++ Map(
"sortTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to shuffle sort"),
"c2rTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to shuffle c2r")
)
} else {
baseMetrics ++ Map(
"splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to split")
)
}
}