override def genBatchScanTransformerMetrics()

in backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/MetricsApiImpl.scala [55:548]


  override def genBatchScanTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "inputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
      "inputVectors" -> SQLMetrics.createMetric(sparkContext, "number of input vectors"),
      "inputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of input bytes"),
      "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
      "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of batch scan"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "scan time"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
        sparkContext,
        "number of dynamic filters accepted"),
      "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
      "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
      "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
      "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
      "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
      "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "remaining filter time"),
      "ioWaitTime" -> SQLMetrics.createTimingMetric(sparkContext, "io wait time")
    )

  override def genBatchScanTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new BatchScanMetricsUpdater(metrics)

  override def genHiveTableScanTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
      "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of scan"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of scan and filter"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
      "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
      "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
      "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
      "pruningTime" ->
        SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
        sparkContext,
        "number of dynamic filters accepted"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
      "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
      "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
      "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
      "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
      "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "remaining filter time"),
      "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time")
    )

  override def genHiveTableScanTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new HiveTableScanMetricsUpdater(metrics)

  override def genFileSourceScanTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "rawInputRows" -> SQLMetrics.createMetric(sparkContext, "number of raw input rows"),
      "rawInputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of raw input bytes"),
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "scanTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of scan"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of scan and filter"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"),
      "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"),
      "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read"),
      "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"),
      "pruningTime" ->
        SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "numDynamicFiltersAccepted" -> SQLMetrics.createMetric(
        sparkContext,
        "number of dynamic filters accepted"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "skippedSplits" -> SQLMetrics.createMetric(sparkContext, "number of skipped splits"),
      "processedSplits" -> SQLMetrics.createMetric(sparkContext, "number of processed splits"),
      "preloadSplits" -> SQLMetrics.createMetric(sparkContext, "number of preloaded splits"),
      "skippedStrides" -> SQLMetrics.createMetric(sparkContext, "number of skipped row groups"),
      "processedStrides" -> SQLMetrics.createMetric(sparkContext, "number of processed row groups"),
      "remainingFilterTime" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "remaining filter time"),
      "ioWaitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "io wait time")
    )

  override def genFileSourceScanTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new FileSourceScanMetricsUpdater(metrics)

  override def genFilterTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of filter"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genFilterTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new FilterMetricsUpdater(metrics)

  override def genProjectTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of project"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genProjectTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new ProjectMetricsUpdater(metrics)

  override def genHashAggregateTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "aggOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "aggOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "aggOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "aggCpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "aggWallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of aggregation"),
      "aggPeakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "aggNumMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "aggSpilledBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of spilled bytes"),
      "aggSpilledRows" -> SQLMetrics.createMetric(sparkContext, "number of spilled rows"),
      "aggSpilledPartitions" -> SQLMetrics.createMetric(
        sparkContext,
        "number of spilled partitions"),
      "aggSpilledFiles" -> SQLMetrics.createMetric(sparkContext, "number of spilled files"),
      "flushRowCount" -> SQLMetrics.createMetric(sparkContext, "number of flushed rows"),
      "preProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "preProjection cpu wall time count"),
      "preProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of preProjection"),
      "postProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "postProjection cpu wall time count"),
      "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of postProjection"),
      "extractionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "extraction cpu wall time count"),
      "extractionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of extraction"),
      "finalOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of final output rows"),
      "finalOutputVectors" -> SQLMetrics.createMetric(
        sparkContext,
        "number of final output vectors")
    )

  override def genHashAggregateTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater =
    new HashAggregateMetricsUpdaterImpl(metrics)

  override def genExpandTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of expand"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genExpandTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new ExpandMetricsUpdater(metrics)

  override def genCustomExpandMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))

  override def genColumnarShuffleExchangeMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
      "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions"),
      "bytesSpilled" -> SQLMetrics.createSizeMetric(sparkContext, "shuffle bytes spilled"),
      "splitBufferSize" -> SQLMetrics.createSizeMetric(sparkContext, "split buffer size total"),
      "splitTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to split"),
      "spillTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to spill"),
      "compressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to compress"),
      "prepareTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime to prepare"),
      "decompressTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime decompress"),
      "ipcTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime ipc"),
      "deserializeTime" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime deserialize"),
      "avgReadBatchNumRows" -> SQLMetrics
        .createAverageMetric(sparkContext, "avg read batch num rows"),
      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
      "numOutputRows" -> SQLMetrics
        .createMetric(sparkContext, "number of output rows"),
      "inputBatches" -> SQLMetrics
        .createMetric(sparkContext, "number of input batches"),
      "uncompressedDataSize" -> SQLMetrics.createSizeMetric(sparkContext, "uncompressed data size")
    )

  override def genWindowTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of window"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genWindowTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new WindowMetricsUpdater(metrics)

  override def genColumnarToRowMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "numInputBatches" -> SQLMetrics.createMetric(sparkContext, "number of input batches"),
      "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to convert")
    )

  override def genRowToColumnarMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numInputRows" -> SQLMetrics.createMetric(sparkContext, "number of input rows"),
      "numOutputBatches" -> SQLMetrics.createMetric(sparkContext, "number of output batches"),
      "convertTime" -> SQLMetrics.createTimingMetric(sparkContext, "totaltime to convert")
    )

  override def genLimitTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of limit"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  override def genLimitTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new LimitMetricsUpdater(metrics)

  def genWriteFilesTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "physicalWrittenBytes" -> SQLMetrics.createMetric(sparkContext, "number of written bytes"),
      "numWrittenFiles" -> SQLMetrics.createMetric(sparkContext, "number of written files")
    )

  def genWriteFilesTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new WriteFilesMetricsUpdater(metrics)

  override def genSortTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "outputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of sort"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "spilledBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "total bytes written for spilling"),
      "spilledRows" -> SQLMetrics.createMetric(sparkContext, "total rows written for spilling"),
      "spilledPartitions" -> SQLMetrics.createMetric(sparkContext, "total spilled partitions"),
      "spilledFiles" -> SQLMetrics.createMetric(sparkContext, "total spilled files")
    )

  override def genSortTransformerMetricsUpdater(metrics: Map[String, SQLMetric]): MetricsUpdater =
    new SortMetricsUpdater(metrics)

  override def genSortMergeJoinTransformerMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(sparkContext, "totaltime of merge join"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations"),
      "streamPreProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "stream preProject cpu wall time count"),
      "streamPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of stream preProjection"),
      "bufferPreProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "buffer preProject cpu wall time count"),
      "bufferPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of buffer preProjection"),
      "postProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "postProject cpu wall time count"),
      "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of postProjection")
    )

  override def genSortMergeJoinTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new SortMergeJoinMetricsUpdater(metrics)

  override def genColumnarBroadcastExchangeMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "collectTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to collect"),
      "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")
    )

  override def genColumnarSubqueryBroadcastMetrics(
      sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect (ms)")
    )

  override def genHashJoinTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "hashBuildInputRows" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash build input rows"),
      "hashBuildOutputRows" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash build output rows"),
      "hashBuildOutputVectors" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash build output vectors"),
      "hashBuildOutputBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "number of hash build output bytes"),
      "hashBuildCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "hash build cpu wall time count"),
      "hashBuildWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of hash build"),
      "hashBuildPeakMemoryBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "hash build peak memory bytes"),
      "hashBuildNumMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash build memory allocations"),
      "hashBuildSpilledBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "total bytes written for spilling of hash build"),
      "hashBuildSpilledRows" -> SQLMetrics.createMetric(
        sparkContext,
        "total rows written for spilling of hash build"),
      "hashBuildSpilledPartitions" -> SQLMetrics.createMetric(
        sparkContext,
        "total spilled partitions of hash build"),
      "hashBuildSpilledFiles" -> SQLMetrics.createMetric(
        sparkContext,
        "total spilled files of hash build"),
      "hashProbeInputRows" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash probe input rows"),
      "hashProbeOutputRows" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash probe output rows"),
      "hashProbeOutputVectors" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash probe output vectors"),
      "hashProbeOutputBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "number of hash probe output bytes"),
      "hashProbeCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "hash probe cpu wall time count"),
      "hashProbeWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of hash probe"),
      "hashProbePeakMemoryBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "hash probe peak memory bytes"),
      "hashProbeNumMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash probe memory allocations"),
      "hashProbeSpilledBytes" -> SQLMetrics.createSizeMetric(
        sparkContext,
        "total bytes written for spilling of hash probe"),
      "hashProbeSpilledRows" -> SQLMetrics.createMetric(
        sparkContext,
        "total rows written for spilling of hash probe"),
      "hashProbeSpilledPartitions" -> SQLMetrics.createMetric(
        sparkContext,
        "total spilled partitions of hash probe"),
      "hashProbeSpilledFiles" -> SQLMetrics.createMetric(
        sparkContext,
        "total spilled files of hash probe"),
      "hashProbeReplacedWithDynamicFilterRows" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash probe replaced with dynamic filter rows"),
      "hashProbeDynamicFiltersProduced" -> SQLMetrics.createMetric(
        sparkContext,
        "number of hash probe dynamic filters produced"),
      "streamPreProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "stream preProject cpu wall time count"),
      "streamPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of stream preProjection"),
      "buildPreProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "preProject cpu wall time count"),
      "buildPreProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime to build preProjection"),
      "postProjectionCpuCount" -> SQLMetrics.createMetric(
        sparkContext,
        "postProject cpu wall time count"),
      "postProjectionWallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of postProjection"),
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "numOutputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "numOutputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes")
    )

  override def genHashJoinTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new HashJoinMetricsUpdater(metrics)

  def genCartesianProductTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] =
    Map(
      "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
      "outputVectors" -> SQLMetrics.createMetric(sparkContext, "number of output vectors"),
      "outputBytes" -> SQLMetrics.createSizeMetric(sparkContext, "number of output bytes"),
      "wallNanos" -> SQLMetrics.createNanoTimingMetric(
        sparkContext,
        "totaltime of cartesian product"),
      "cpuCount" -> SQLMetrics.createMetric(sparkContext, "cpu wall time count"),
      "peakMemoryBytes" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory bytes"),
      "numMemoryAllocations" -> SQLMetrics.createMetric(
        sparkContext,
        "number of memory allocations")
    )

  def genCartesianProductTransformerMetricsUpdater(
      metrics: Map[String, SQLMetric]): MetricsUpdater = new CartesianProductMetricsUpdater(metrics)

  override def genGenerateTransformerMetrics(sparkContext: SparkContext): Map[String, SQLMetric] = {
    Map("numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
  }