override def metricsUpdater()

in gluten-core/src/main/scala/io/glutenproject/execution/ExpandExecTransformer.scala [54:174]


  override def metricsUpdater(): MetricsUpdater =
    BackendsApiManager.getMetricsApiInstance.genExpandTransformerMetricsUpdater(metrics)

  // The GroupExpressions can output data with arbitrary partitioning, so set it
  // as UNKNOWN partitioning
  override def outputPartitioning: Partitioning = UnknownPartitioning(0)

  def getRelNode(
      context: SubstraitContext,
      projections: Seq[Seq[Expression]],
      originalInputAttributes: Seq[Attribute],
      operatorId: Long,
      input: RelNode,
      validation: Boolean): RelNode = {
    val args = context.registeredFunction
    def needsPreProjection(projections: Seq[Seq[Expression]]): Boolean = {
      projections
        .exists(set => set.exists(p => !p.isInstanceOf[Attribute] && !p.isInstanceOf[Literal]))
    }
    if (needsPreProjection(projections)) {
      // if there is not literal and attribute expression in project sets, add a project op
      // to calculate them before expand op.
      val preExprs = ArrayBuffer.empty[Expression]
      val selectionMaps = ArrayBuffer.empty[Seq[Int]]
      var preExprIndex = 0
      for (i <- projections.indices) {
        val selections = ArrayBuffer.empty[Int]
        for (j <- projections(i).indices) {
          val proj = projections(i)(j)
          if (!proj.isInstanceOf[Literal]) {
            val exprIdx = preExprs.indexWhere(expr => expr.semanticEquals(proj))
            if (exprIdx != -1) {
              selections += exprIdx
            } else {
              preExprs += proj
              selections += preExprIndex
              preExprIndex = preExprIndex + 1
            }
          } else {
            selections += -1
          }
        }
        selectionMaps += selections
      }
      // make project
      val preExprNodes = preExprs
        .map(
          ExpressionConverter
            .replaceWithExpressionTransformer(_, originalInputAttributes)
            .doTransform(args))
        .asJava

      val emitStartIndex = originalInputAttributes.size
      val inputRel = if (!validation) {
        RelBuilder.makeProjectRel(input, preExprNodes, context, operatorId, emitStartIndex)
      } else {
        // Use a extension node to send the input types through Substrait plan for a validation.
        val inputTypeNodeList = new java.util.ArrayList[TypeNode]()
        for (attr <- originalInputAttributes) {
          inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
        }
        val extensionNode = ExtensionBuilder.makeAdvancedExtension(
          BackendsApiManager.getTransformerApiInstance.packPBMessage(
            TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
        RelBuilder.makeProjectRel(
          input,
          preExprNodes,
          extensionNode,
          context,
          operatorId,
          emitStartIndex)
      }

      // make expand
      val projectSetExprNodes = new JArrayList[JList[ExpressionNode]]()
      for (i <- projections.indices) {
        val projectExprNodes = new JArrayList[ExpressionNode]()
        for (j <- projections(i).indices) {
          val projectExprNode = projections(i)(j) match {
            case l: Literal =>
              LiteralTransformer(l).doTransform(args)
            case _ =>
              ExpressionBuilder.makeSelection(selectionMaps(i)(j))
          }

          projectExprNodes.add(projectExprNode)
        }
        projectSetExprNodes.add(projectExprNodes)
      }
      RelBuilder.makeExpandRel(inputRel, projectSetExprNodes, context, operatorId)
    } else {
      val projectSetExprNodes = new JArrayList[JList[ExpressionNode]]()
      projections.foreach {
        projectSet =>
          val projectExprNodes = new JArrayList[ExpressionNode]()
          projectSet.foreach {
            project =>
              val projectExprNode = ExpressionConverter
                .replaceWithExpressionTransformer(project, originalInputAttributes)
                .doTransform(args)
              projectExprNodes.add(projectExprNode)
          }
          projectSetExprNodes.add(projectExprNodes)
      }

      if (!validation) {
        RelBuilder.makeExpandRel(input, projectSetExprNodes, context, operatorId)
      } else {
        // Use a extension node to send the input types through Substrait plan for a validation.
        val inputTypeNodeList = new java.util.ArrayList[TypeNode]()
        for (attr <- originalInputAttributes) {
          inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
        }

        val extensionNode = ExtensionBuilder.makeAdvancedExtension(
          BackendsApiManager.getTransformerApiInstance.packPBMessage(
            TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
        RelBuilder.makeExpandRel(input, projectSetExprNodes, extensionNode, context, operatorId)
      }
    }
  }