in gluten-core/src/main/scala/io/glutenproject/execution/ExpandExecTransformer.scala [54:174]
override def metricsUpdater(): MetricsUpdater =
BackendsApiManager.getMetricsApiInstance.genExpandTransformerMetricsUpdater(metrics)
// The GroupExpressions can output data with arbitrary partitioning, so set it
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)
def getRelNode(
context: SubstraitContext,
projections: Seq[Seq[Expression]],
originalInputAttributes: Seq[Attribute],
operatorId: Long,
input: RelNode,
validation: Boolean): RelNode = {
val args = context.registeredFunction
def needsPreProjection(projections: Seq[Seq[Expression]]): Boolean = {
projections
.exists(set => set.exists(p => !p.isInstanceOf[Attribute] && !p.isInstanceOf[Literal]))
}
if (needsPreProjection(projections)) {
// if there is not literal and attribute expression in project sets, add a project op
// to calculate them before expand op.
val preExprs = ArrayBuffer.empty[Expression]
val selectionMaps = ArrayBuffer.empty[Seq[Int]]
var preExprIndex = 0
for (i <- projections.indices) {
val selections = ArrayBuffer.empty[Int]
for (j <- projections(i).indices) {
val proj = projections(i)(j)
if (!proj.isInstanceOf[Literal]) {
val exprIdx = preExprs.indexWhere(expr => expr.semanticEquals(proj))
if (exprIdx != -1) {
selections += exprIdx
} else {
preExprs += proj
selections += preExprIndex
preExprIndex = preExprIndex + 1
}
} else {
selections += -1
}
}
selectionMaps += selections
}
// make project
val preExprNodes = preExprs
.map(
ExpressionConverter
.replaceWithExpressionTransformer(_, originalInputAttributes)
.doTransform(args))
.asJava
val emitStartIndex = originalInputAttributes.size
val inputRel = if (!validation) {
RelBuilder.makeProjectRel(input, preExprNodes, context, operatorId, emitStartIndex)
} else {
// Use a extension node to send the input types through Substrait plan for a validation.
val inputTypeNodeList = new java.util.ArrayList[TypeNode]()
for (attr <- originalInputAttributes) {
inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
}
val extensionNode = ExtensionBuilder.makeAdvancedExtension(
BackendsApiManager.getTransformerApiInstance.packPBMessage(
TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
RelBuilder.makeProjectRel(
input,
preExprNodes,
extensionNode,
context,
operatorId,
emitStartIndex)
}
// make expand
val projectSetExprNodes = new JArrayList[JList[ExpressionNode]]()
for (i <- projections.indices) {
val projectExprNodes = new JArrayList[ExpressionNode]()
for (j <- projections(i).indices) {
val projectExprNode = projections(i)(j) match {
case l: Literal =>
LiteralTransformer(l).doTransform(args)
case _ =>
ExpressionBuilder.makeSelection(selectionMaps(i)(j))
}
projectExprNodes.add(projectExprNode)
}
projectSetExprNodes.add(projectExprNodes)
}
RelBuilder.makeExpandRel(inputRel, projectSetExprNodes, context, operatorId)
} else {
val projectSetExprNodes = new JArrayList[JList[ExpressionNode]]()
projections.foreach {
projectSet =>
val projectExprNodes = new JArrayList[ExpressionNode]()
projectSet.foreach {
project =>
val projectExprNode = ExpressionConverter
.replaceWithExpressionTransformer(project, originalInputAttributes)
.doTransform(args)
projectExprNodes.add(projectExprNode)
}
projectSetExprNodes.add(projectExprNodes)
}
if (!validation) {
RelBuilder.makeExpandRel(input, projectSetExprNodes, context, operatorId)
} else {
// Use a extension node to send the input types through Substrait plan for a validation.
val inputTypeNodeList = new java.util.ArrayList[TypeNode]()
for (attr <- originalInputAttributes) {
inputTypeNodeList.add(ConverterUtils.getTypeNode(attr.dataType, attr.nullable))
}
val extensionNode = ExtensionBuilder.makeAdvancedExtension(
BackendsApiManager.getTransformerApiInstance.packPBMessage(
TypeBuilder.makeStruct(false, inputTypeNodeList).toProtobuf))
RelBuilder.makeExpandRel(input, projectSetExprNodes, extensionNode, context, operatorId)
}
}
}