override def genBatchScanTransformerMetrics()

in backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxMetricsApi.scala [75:327]


  override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
      "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
      "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
      "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
      "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of batch scan"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "scan time"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
        sparkContext,
        "number of dynamic filters accepted"),
      "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
      "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
      "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
      "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
      "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
      "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "remaining filter time"),
      "ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time"),
      "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
      "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
    )

  override def genBatchScanTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new BatchScanMetricsUpdater(metrics)

  override def genHiveTableScanTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
      "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan and filter"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
      "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
      "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
      "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
      "pruningTime" ->
        SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
        sparkContext,
        "number of dynamic filters accepted"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
      "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
      "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
      "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
      "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
      "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "remaining filter time"),
      "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
      "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
      "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
    )

  override def genHiveTableScanTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new HiveTableScanMetricsUpdater(metrics)

  override def genFileSourceScanTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
      "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of scan and filter"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
      "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
      "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
      "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
      "pruningTime" ->
        SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
        sparkContext,
        "number of dynamic filters accepted"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
      "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
      "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
      "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
      "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
      "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "remaining filter time"),
      "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time"),
      "storageReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "storage read bytes"),
      "localReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "local ssd read bytes"),
      "ramReadBytes" -> SQLMetrics.createSizeMetric(sparkContext, "ram read bytes")
    )

  override def genFileSourceScanTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)

  override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of filter"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genFilterTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric],
      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
    new FilterMetricsUpdater(metrics, extraMetrics)

  override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of project"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genProjectTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric],
      extraMetrics: Seq[(String, SQLMetric)] = Seq.empty): MetricsUpdater =
    new ProjectMetricsUpdater(metrics, extraMetrics)

  override def genHashAggregateTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "aggOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "aggOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "aggOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "aggCpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "aggWallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of aggregation"),
      "aggPeakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "aggNumMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "aggSpilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of spilled bytes"),
      "aggSpilledRows" -> SQLMetrics.createMetric(sparkContext, "number of spilled rows"),
      "aggSpilledPartitions" -> SQLMetrics.createMetric(
        sparkContext,
        "number of spilled partitions"),
      "aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of spilled files"),
      "flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of flushed rows"),
      "loadedToValueHook" -> SQLMetrics.createMetric(
        sparkContext,
        "number of pushdown aggregations"),
      "rowConstructionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "rowConstruction cpu wall time count"),
      "rowConstructionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "time of rowConstruction"),
      "extractionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "extraction cpu wall time count"),
      "extractionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "time of extraction"),
      "finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"),
      "finalOutputVectors" -> SQLMetrics.createMetric(
        sparkContext,
        "number of final output vectors")
    )

  override def genHashAggregateTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater =
    new HashAggregateMetricsUpdaterImpl(metrics)

  override def genExpandTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time of expand"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new ExpandMetricsUpdater(metrics)

  override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

  override def genColumnarShuffleExchangeMetrics(
      sparkContext: SparkContext,
      isSort: Boolean): Map[String, SQLMetric] = {
    val baseMetrics = Map(
      "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"),
      "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
      "bytesSpilled" -> SQLMetrics.createSizeMetric(sparkContext, "shuffle bytes spilled"),
      "avgReadBatchNumRows" -> SQLMetrics
        .createAverageMetric(sparkContext, "avg read batch num rows"),
      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
      "numOutputRows" -> SQLMetrics
        .createMetric(sparkContext, "number of output rows"),
      "inputBatches" -> SQLMetrics
        .createMetric(sparkContext, "number of input batches"),
      "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to spill"),
      "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to compress"),
      "decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to decompress"),
      "deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to deserialize"),
      "shuffleWallTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "shuffle wall time"),
      // For hash shuffle writer, the peak bytes represents the maximum split buffer size.
      // For sort shuffle writer, the peak bytes represents the maximum
      // row buffer + sort buffer size.
      "peakBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak bytes allocated")
    )
    if (isSort) {
      baseMetrics ++ Map(
        "sortTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to shuffle sort"),
        "c2rTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to shuffle c2r")
      )
    } else {
      baseMetrics ++ Map(
        "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "time to split")
      )
    }
  }