in spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala [517:2046]
def exprToProtoInternal(
expr: Expression,
inputs: Seq[Attribute],
binding: Boolean): Option[Expr] = {
SQLConf.get
def convert(handler: CometExpressionSerde): Option[Expr] = {
handler match {
case _: IncompatExpr if !CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.get() =>
withInfo(
expr,
s"$expr is not fully compatible with Spark. To enable it anyway, set " +
s"${CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key}=true. ${CometConf.COMPAT_GUIDE}.")
None
case _ =>
handler.convert(expr, inputs, binding)
}
}
expr match {
case a @ Alias(_, _) =>
val r = exprToProtoInternal(a.child, inputs, binding)
if (r.isEmpty) {
withInfo(expr, a.child)
}
r
case cast @ Cast(_: Literal, dataType, _, _) =>
// This can happen after promoting decimal precisions
val value = cast.eval()
exprToProtoInternal(Literal(value, dataType), inputs, binding)
case UnaryExpression(child) if expr.prettyName == "trycast" =>
val timeZoneId = SQLConf.get.sessionLocalTimeZone
handleCast(
expr,
child,
inputs,
binding,
expr.dataType,
Some(timeZoneId),
CometEvalMode.TRY)
case c @ Cast(child, dt, timeZoneId, _) =>
handleCast(expr, child, inputs, binding, dt, timeZoneId, evalMode(c))
case add @ Add(left, right, _) if supportedDataType(left.dataType) =>
createMathExpression(
expr,
left,
right,
inputs,
binding,
add.dataType,
add.evalMode == EvalMode.ANSI,
(builder, mathExpr) => builder.setAdd(mathExpr))
case add @ Add(left, _, _) if !supportedDataType(left.dataType) =>
withInfo(add, s"Unsupported datatype ${left.dataType}")
None
case sub @ Subtract(left, right, _) if supportedDataType(left.dataType) =>
createMathExpression(
expr,
left,
right,
inputs,
binding,
sub.dataType,
sub.evalMode == EvalMode.ANSI,
(builder, mathExpr) => builder.setSubtract(mathExpr))
case sub @ Subtract(left, _, _) if !supportedDataType(left.dataType) =>
withInfo(sub, s"Unsupported datatype ${left.dataType}")
None
case mul @ Multiply(left, right, _) if supportedDataType(left.dataType) =>
createMathExpression(
expr,
left,
right,
inputs,
binding,
mul.dataType,
mul.evalMode == EvalMode.ANSI,
(builder, mathExpr) => builder.setMultiply(mathExpr))
case mul @ Multiply(left, _, _) =>
if (!supportedDataType(left.dataType)) {
withInfo(mul, s"Unsupported datatype ${left.dataType}")
}
None
case div @ Divide(left, right, _) if supportedDataType(left.dataType) =>
// Datafusion now throws an exception for dividing by zero
// See https://github.com/apache/arrow-datafusion/pull/6792
// For now, use NullIf to swap zeros with nulls.
val rightExpr = nullIfWhenPrimitive(right)
createMathExpression(
expr,
left,
rightExpr,
inputs,
binding,
div.dataType,
div.evalMode == EvalMode.ANSI,
(builder, mathExpr) => builder.setDivide(mathExpr))
case div @ Divide(left, _, _) =>
if (!supportedDataType(left.dataType)) {
withInfo(div, s"Unsupported datatype ${left.dataType}")
}
None
case div @ IntegralDivide(left, right, _) if supportedDataType(left.dataType) =>
val rightExpr = nullIfWhenPrimitive(right)
val dataType = (left.dataType, right.dataType) match {
case (l: DecimalType, r: DecimalType) =>
// copy from IntegralDivide.resultDecimalType
val intDig = l.precision - l.scale + r.scale
DecimalType(min(if (intDig == 0) 1 else intDig, DecimalType.MAX_PRECISION), 0)
case _ => left.dataType
}
val divideExpr = createMathExpression(
expr,
left,
rightExpr,
inputs,
binding,
dataType,
div.evalMode == EvalMode.ANSI,
(builder, mathExpr) => builder.setIntegralDivide(mathExpr))
if (divideExpr.isDefined) {
val childExpr = if (dataType.isInstanceOf[DecimalType]) {
// check overflow for decimal type
val builder = ExprOuterClass.CheckOverflow.newBuilder()
builder.setChild(divideExpr.get)
builder.setFailOnError(div.evalMode == EvalMode.ANSI)
builder.setDatatype(serializeDataType(dataType).get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setCheckOverflow(builder)
.build())
} else {
divideExpr
}
// cast result to long
castToProto(expr, None, LongType, childExpr.get, CometEvalMode.LEGACY)
} else {
None
}
case div @ IntegralDivide(left, _, _) =>
if (!supportedDataType(left.dataType)) {
withInfo(div, s"Unsupported datatype ${left.dataType}")
}
None
case rem @ Remainder(left, right, _) if supportedDataType(left.dataType) =>
val rightExpr = nullIfWhenPrimitive(right)
createMathExpression(
expr,
left,
rightExpr,
inputs,
binding,
rem.dataType,
rem.evalMode == EvalMode.ANSI,
(builder, mathExpr) => builder.setRemainder(mathExpr))
case rem @ Remainder(left, _, _) =>
if (!supportedDataType(left.dataType)) {
withInfo(rem, s"Unsupported datatype ${left.dataType}")
}
None
case EqualTo(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setEq(binaryExpr))
case Not(EqualTo(left, right)) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setNeq(binaryExpr))
case EqualNullSafe(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setEqNullSafe(binaryExpr))
case Not(EqualNullSafe(left, right)) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setNeqNullSafe(binaryExpr))
case GreaterThan(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setGt(binaryExpr))
case GreaterThanOrEqual(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setGtEq(binaryExpr))
case LessThan(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setLt(binaryExpr))
case LessThanOrEqual(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setLtEq(binaryExpr))
case Literal(value, dataType)
if supportedDataType(dataType, allowComplex = value == null) =>
val exprBuilder = ExprOuterClass.Literal.newBuilder()
if (value == null) {
exprBuilder.setIsNull(true)
} else {
exprBuilder.setIsNull(false)
dataType match {
case _: BooleanType => exprBuilder.setBoolVal(value.asInstanceOf[Boolean])
case _: ByteType => exprBuilder.setByteVal(value.asInstanceOf[Byte])
case _: ShortType => exprBuilder.setShortVal(value.asInstanceOf[Short])
case _: IntegerType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case _: LongType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: FloatType => exprBuilder.setFloatVal(value.asInstanceOf[Float])
case _: DoubleType => exprBuilder.setDoubleVal(value.asInstanceOf[Double])
case _: StringType =>
exprBuilder.setStringVal(value.asInstanceOf[UTF8String].toString)
case _: TimestampType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: TimestampNTZType => exprBuilder.setLongVal(value.asInstanceOf[Long])
case _: DecimalType =>
// Pass decimal literal as bytes.
val unscaled = value.asInstanceOf[Decimal].toBigDecimal.underlying.unscaledValue
exprBuilder.setDecimalVal(
com.google.protobuf.ByteString.copyFrom(unscaled.toByteArray))
case _: BinaryType =>
val byteStr =
com.google.protobuf.ByteString.copyFrom(value.asInstanceOf[Array[Byte]])
exprBuilder.setBytesVal(byteStr)
case _: DateType => exprBuilder.setIntVal(value.asInstanceOf[Int])
case dt =>
logWarning(s"Unexpected datatype '$dt' for literal value '$value'")
}
}
val dt = serializeDataType(dataType)
if (dt.isDefined) {
exprBuilder.setDatatype(dt.get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setLiteral(exprBuilder)
.build())
} else {
withInfo(expr, s"Unsupported datatype $dataType")
None
}
case Literal(_, dataType) if !supportedDataType(dataType) =>
withInfo(expr, s"Unsupported datatype $dataType")
None
case Substring(str, Literal(pos, _), Literal(len, _)) =>
val strExpr = exprToProtoInternal(str, inputs, binding)
if (strExpr.isDefined) {
val builder = ExprOuterClass.Substring.newBuilder()
builder.setChild(strExpr.get)
builder.setStart(pos.asInstanceOf[Int])
builder.setLen(len.asInstanceOf[Int])
Some(
ExprOuterClass.Expr
.newBuilder()
.setSubstring(builder)
.build())
} else {
withInfo(expr, str)
None
}
case StructsToJson(options, child, timezoneId) =>
if (options.nonEmpty) {
withInfo(expr, "StructsToJson with options is not supported")
None
} else {
def isSupportedType(dt: DataType): Boolean = {
dt match {
case StructType(fields) =>
fields.forall(f => isSupportedType(f.dataType))
case DataTypes.BooleanType | DataTypes.ByteType | DataTypes.ShortType |
DataTypes.IntegerType | DataTypes.LongType | DataTypes.FloatType |
DataTypes.DoubleType | DataTypes.StringType =>
true
case DataTypes.DateType | DataTypes.TimestampType =>
// TODO implement these types with tests for formatting options and timezone
false
case _: MapType | _: ArrayType =>
// Spark supports map and array in StructsToJson but this is not yet
// implemented in Comet
false
case _ => false
}
}
val isSupported = child.dataType match {
case s: StructType =>
s.fields.forall(f => isSupportedType(f.dataType))
case _: MapType | _: ArrayType =>
// Spark supports map and array in StructsToJson but this is not yet
// implemented in Comet
false
case _ =>
false
}
if (isSupported) {
exprToProtoInternal(child, inputs, binding) match {
case Some(p) =>
val toJson = ExprOuterClass.ToJson
.newBuilder()
.setChild(p)
.setTimezone(timezoneId.getOrElse("UTC"))
.setIgnoreNullFields(true)
.build()
Some(
ExprOuterClass.Expr
.newBuilder()
.setToJson(toJson)
.build())
case _ =>
withInfo(expr, child)
None
}
} else {
withInfo(expr, "Unsupported data type", child)
None
}
}
case Like(left, right, escapeChar) =>
if (escapeChar == '\\') {
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setLike(binaryExpr))
} else {
// TODO custom escape char
withInfo(expr, s"custom escape character $escapeChar not supported in LIKE")
None
}
case RLike(left, right) =>
// we currently only support scalar regex patterns
right match {
case Literal(pattern, DataTypes.StringType) =>
if (!RegExp.isSupportedPattern(pattern.toString) &&
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
withInfo(
expr,
s"Regexp pattern $pattern is not compatible with Spark. " +
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
"to allow it anyway.")
return None
}
case _ =>
withInfo(expr, "Only scalar regexp patterns are supported")
return None
}
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setRlike(binaryExpr))
case StartsWith(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setStartsWith(binaryExpr))
case EndsWith(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setEndsWith(binaryExpr))
case Contains(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setContains(binaryExpr))
case StringSpace(child) =>
createUnaryExpr(
expr,
child,
inputs,
binding,
(builder, unaryExpr) => builder.setStringSpace(unaryExpr))
case Hour(child, timeZoneId) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val builder = ExprOuterClass.Hour.newBuilder()
builder.setChild(childExpr.get)
val timeZone = timeZoneId.getOrElse("UTC")
builder.setTimezone(timeZone)
Some(
ExprOuterClass.Expr
.newBuilder()
.setHour(builder)
.build())
} else {
withInfo(expr, child)
None
}
case Minute(child, timeZoneId) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val builder = ExprOuterClass.Minute.newBuilder()
builder.setChild(childExpr.get)
val timeZone = timeZoneId.getOrElse("UTC")
builder.setTimezone(timeZone)
Some(
ExprOuterClass.Expr
.newBuilder()
.setMinute(builder)
.build())
} else {
withInfo(expr, child)
None
}
case DateAdd(left, right) =>
val leftExpr = exprToProtoInternal(left, inputs, binding)
val rightExpr = exprToProtoInternal(right, inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("date_add", DateType, leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, left, right)
case DateSub(left, right) =>
val leftExpr = exprToProtoInternal(left, inputs, binding)
val rightExpr = exprToProtoInternal(right, inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("date_sub", DateType, leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, left, right)
case TruncDate(child, format) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val formatExpr = exprToProtoInternal(format, inputs, binding)
if (childExpr.isDefined && formatExpr.isDefined) {
val builder = ExprOuterClass.TruncDate.newBuilder()
builder.setChild(childExpr.get)
builder.setFormat(formatExpr.get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setTruncDate(builder)
.build())
} else {
withInfo(expr, child, format)
None
}
case TruncTimestamp(format, child, timeZoneId) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val formatExpr = exprToProtoInternal(format, inputs, binding)
if (childExpr.isDefined && formatExpr.isDefined) {
val builder = ExprOuterClass.TruncTimestamp.newBuilder()
builder.setChild(childExpr.get)
builder.setFormat(formatExpr.get)
val timeZone = timeZoneId.getOrElse("UTC")
builder.setTimezone(timeZone)
Some(
ExprOuterClass.Expr
.newBuilder()
.setTruncTimestamp(builder)
.build())
} else {
withInfo(expr, child, format)
None
}
case Second(child, timeZoneId) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val builder = ExprOuterClass.Second.newBuilder()
builder.setChild(childExpr.get)
val timeZone = timeZoneId.getOrElse("UTC")
builder.setTimezone(timeZone)
Some(
ExprOuterClass.Expr
.newBuilder()
.setSecond(builder)
.build())
} else {
withInfo(expr, child)
None
}
case Year(child) =>
val periodType = exprToProtoInternal(Literal("year"), inputs, binding)
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("datepart", Seq(periodType, childExpr): _*)
.map(e => {
Expr
.newBuilder()
.setCast(
ExprOuterClass.Cast
.newBuilder()
.setChild(e)
.setDatatype(serializeDataType(IntegerType).get)
.setEvalMode(ExprOuterClass.EvalMode.LEGACY)
.setAllowIncompat(false)
.build())
.build()
})
optExprWithInfo(optExpr, expr, child)
case IsNull(child) =>
createUnaryExpr(
expr,
child,
inputs,
binding,
(builder, unaryExpr) => builder.setIsNull(unaryExpr))
case IsNotNull(child) =>
createUnaryExpr(
expr,
child,
inputs,
binding,
(builder, unaryExpr) => builder.setIsNotNull(unaryExpr))
case IsNaN(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("isnan", BooleanType, childExpr)
optExprWithInfo(optExpr, expr, child)
case SortOrder(child, direction, nullOrdering, _) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val sortOrderBuilder = ExprOuterClass.SortOrder.newBuilder()
sortOrderBuilder.setChild(childExpr.get)
direction match {
case Ascending => sortOrderBuilder.setDirectionValue(0)
case Descending => sortOrderBuilder.setDirectionValue(1)
}
nullOrdering match {
case NullsFirst => sortOrderBuilder.setNullOrderingValue(0)
case NullsLast => sortOrderBuilder.setNullOrderingValue(1)
}
Some(
ExprOuterClass.Expr
.newBuilder()
.setSortOrder(sortOrderBuilder)
.build())
} else {
withInfo(expr, child)
None
}
case And(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setAnd(binaryExpr))
case Or(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setOr(binaryExpr))
case UnaryExpression(child) if expr.prettyName == "promote_precision" =>
// `UnaryExpression` includes `PromotePrecision` for Spark 3.3
// `PromotePrecision` is just a wrapper, don't need to serialize it.
exprToProtoInternal(child, inputs, binding)
case CheckOverflow(child, dt, nullOnOverflow) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val builder = ExprOuterClass.CheckOverflow.newBuilder()
builder.setChild(childExpr.get)
builder.setFailOnError(!nullOnOverflow)
// `dataType` must be decimal type
val dataType = serializeDataType(dt)
builder.setDatatype(dataType.get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setCheckOverflow(builder)
.build())
} else {
withInfo(expr, child)
None
}
case attr: AttributeReference =>
val dataType = serializeDataType(attr.dataType)
if (dataType.isDefined) {
if (binding) {
// Spark may produce unresolvable attributes in some cases,
// for example https://github.com/apache/datafusion-comet/issues/925.
// So, we allow the binding to fail.
val boundRef: Any = BindReferences
.bindReference(attr, inputs, allowFailures = true)
if (boundRef.isInstanceOf[AttributeReference]) {
withInfo(attr, s"cannot resolve $attr among ${inputs.mkString(", ")}")
return None
}
val boundExpr = ExprOuterClass.BoundReference
.newBuilder()
.setIndex(boundRef.asInstanceOf[BoundReference].ordinal)
.setDatatype(dataType.get)
.build()
Some(
ExprOuterClass.Expr
.newBuilder()
.setBound(boundExpr)
.build())
} else {
val unboundRef = ExprOuterClass.UnboundReference
.newBuilder()
.setName(attr.name)
.setDatatype(dataType.get)
.build()
Some(
ExprOuterClass.Expr
.newBuilder()
.setUnbound(unboundRef)
.build())
}
} else {
withInfo(attr, s"unsupported datatype: ${attr.dataType}")
None
}
// abs implementation is not correct
// https://github.com/apache/datafusion-comet/issues/666
// case Abs(child, failOnErr) =>
// val childExpr = exprToProtoInternal(child, inputs)
// if (childExpr.isDefined) {
// val evalModeStr =
// if (failOnErr) ExprOuterClass.EvalMode.ANSI else ExprOuterClass.EvalMode.LEGACY
// val absBuilder = ExprOuterClass.Abs.newBuilder()
// absBuilder.setChild(childExpr.get)
// absBuilder.setEvalMode(evalModeStr)
// Some(Expr.newBuilder().setAbs(absBuilder).build())
// } else {
// withInfo(expr, child)
// None
// }
case Acos(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("acos", childExpr)
optExprWithInfo(optExpr, expr, child)
case Asin(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("asin", childExpr)
optExprWithInfo(optExpr, expr, child)
case Atan(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("atan", childExpr)
optExprWithInfo(optExpr, expr, child)
case Atan2(left, right) =>
val leftExpr = exprToProtoInternal(left, inputs, binding)
val rightExpr = exprToProtoInternal(right, inputs, binding)
val optExpr = scalarFunctionExprToProto("atan2", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, left, right)
case Hex(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("hex", StringType, childExpr)
optExprWithInfo(optExpr, expr, child)
case e: Unhex =>
val unHex = unhexSerde(e)
val childExpr = exprToProtoInternal(unHex._1, inputs, binding)
val failOnErrorExpr = exprToProtoInternal(unHex._2, inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("unhex", e.dataType, childExpr, failOnErrorExpr)
optExprWithInfo(optExpr, expr, unHex._1)
case e @ Ceil(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
child.dataType match {
case t: DecimalType if t.scale == 0 => // zero scale is no-op
childExpr
case t: DecimalType if t.scale < 0 => // Spark disallows negative scale SPARK-30252
withInfo(e, s"Decimal type $t has negative scale")
None
case _ =>
val optExpr = scalarFunctionExprToProtoWithReturnType("ceil", e.dataType, childExpr)
optExprWithInfo(optExpr, expr, child)
}
case Cos(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("cos", childExpr)
optExprWithInfo(optExpr, expr, child)
case Exp(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("exp", childExpr)
optExprWithInfo(optExpr, expr, child)
case e @ Floor(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
child.dataType match {
case t: DecimalType if t.scale == 0 => // zero scale is no-op
childExpr
case t: DecimalType if t.scale < 0 => // Spark disallows negative scale SPARK-30252
withInfo(e, s"Decimal type $t has negative scale")
None
case _ =>
val optExpr = scalarFunctionExprToProtoWithReturnType("floor", e.dataType, childExpr)
optExprWithInfo(optExpr, expr, child)
}
// The expression for `log` functions is defined as null on numbers less than or equal
// to 0. This matches Spark and Hive behavior, where non positive values eval to null
// instead of NaN or -Infinity.
case Log(child) =>
val childExpr = exprToProtoInternal(nullIfNegative(child), inputs, binding)
val optExpr = scalarFunctionExprToProto("ln", childExpr)
optExprWithInfo(optExpr, expr, child)
case Log10(child) =>
val childExpr = exprToProtoInternal(nullIfNegative(child), inputs, binding)
val optExpr = scalarFunctionExprToProto("log10", childExpr)
optExprWithInfo(optExpr, expr, child)
case Log2(child) =>
val childExpr = exprToProtoInternal(nullIfNegative(child), inputs, binding)
val optExpr = scalarFunctionExprToProto("log2", childExpr)
optExprWithInfo(optExpr, expr, child)
case Pow(left, right) =>
val leftExpr = exprToProtoInternal(left, inputs, binding)
val rightExpr = exprToProtoInternal(right, inputs, binding)
val optExpr = scalarFunctionExprToProto("pow", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, left, right)
case r: Round =>
// _scale s a constant, copied from Spark's RoundBase because it is a protected val
val scaleV: Any = r.scale.eval(EmptyRow)
val _scale: Int = scaleV.asInstanceOf[Int]
lazy val childExpr = exprToProtoInternal(r.child, inputs, binding)
r.child.dataType match {
case t: DecimalType if t.scale < 0 => // Spark disallows negative scale SPARK-30252
withInfo(r, "Decimal type has negative scale")
None
case _ if scaleV == null =>
exprToProtoInternal(Literal(null), inputs, binding)
case _: ByteType | ShortType | IntegerType | LongType if _scale >= 0 =>
childExpr // _scale(I.e. decimal place) >= 0 is a no-op for integer types in Spark
case _: FloatType | DoubleType =>
// We cannot properly match with the Spark behavior for floating-point numbers.
// Spark uses BigDecimal for rounding float/double, and BigDecimal fist converts a
// double to string internally in order to create its own internal representation.
// The problem is BigDecimal uses java.lang.Double.toString() and it has complicated
// rounding algorithm. E.g. -5.81855622136895E8 is actually
// -581855622.13689494132995605468750. Note the 5th fractional digit is 4 instead of
// 5. Java(Scala)'s toString() rounds it up to -581855622.136895. This makes a
// difference when rounding at 5th digit, I.e. round(-5.81855622136895E8, 5) should be
// -5.818556221369E8, instead of -5.8185562213689E8. There is also an example that
// toString() does NOT round up. 6.1317116247283497E18 is 6131711624728349696. It can
// be rounded up to 6.13171162472835E18 that still represents the same double number.
// I.e. 6.13171162472835E18 == 6.1317116247283497E18. However, toString() does not.
// That results in round(6.1317116247283497E18, -5) == 6.1317116247282995E18 instead
// of 6.1317116247283999E18.
withInfo(r, "Comet does not support Spark's BigDecimal rounding")
None
case _ =>
// `scale` must be Int64 type in DataFusion
val scaleExpr = exprToProtoInternal(Literal(_scale.toLong, LongType), inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("round", r.dataType, childExpr, scaleExpr)
optExprWithInfo(optExpr, expr, r.child)
}
// TODO enable once https://github.com/apache/datafusion/issues/11557 is fixed or
// when we have a Spark-compatible version implemented in Comet
// case Signum(child) =>
// val childExpr = exprToProtoInternal(child, inputs)
// val optExpr = scalarExprToProto("signum", childExpr)
// optExprWithInfo(optExpr, expr, child)
case Sin(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("sin", childExpr)
optExprWithInfo(optExpr, expr, child)
case Sqrt(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("sqrt", childExpr)
optExprWithInfo(optExpr, expr, child)
case Tan(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("tan", childExpr)
optExprWithInfo(optExpr, expr, child)
case Ascii(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("ascii", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
case s: StringDecode =>
// Right child is the encoding expression.
s.right match {
case Literal(str, DataTypes.StringType)
if str.toString.toLowerCase(Locale.ROOT) == "utf-8" =>
// decode(col, 'utf-8') can be treated as a cast with "try" eval mode that puts nulls
// for invalid strings.
// Left child is the binary expression.
castToProto(
expr,
None,
DataTypes.StringType,
exprToProtoInternal(s.left, inputs, binding).get,
CometEvalMode.TRY)
case _ =>
withInfo(expr, "Comet only supports decoding with 'utf-8'.")
None
}
case RegExpReplace(subject, pattern, replacement, startPosition) =>
if (!RegExp.isSupportedPattern(pattern.toString) &&
!CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.get()) {
withInfo(
expr,
s"Regexp pattern $pattern is not compatible with Spark. " +
s"Set ${CometConf.COMET_REGEXP_ALLOW_INCOMPATIBLE.key}=true " +
"to allow it anyway.")
return None
}
startPosition match {
case Literal(value, DataTypes.IntegerType) if value == 1 =>
val subjectExpr = exprToProtoInternal(subject, inputs, binding)
val patternExpr = exprToProtoInternal(pattern, inputs, binding)
val replacementExpr = exprToProtoInternal(replacement, inputs, binding)
// DataFusion's regexp_replace stops at the first match. We need to add the 'g' flag
// to apply the regex globally to match Spark behavior.
val flagsExpr = exprToProtoInternal(Literal("g"), inputs, binding)
val optExpr = scalarFunctionExprToProto(
"regexp_replace",
subjectExpr,
patternExpr,
replacementExpr,
flagsExpr)
optExprWithInfo(optExpr, expr, subject, pattern, replacement, startPosition)
case _ =>
withInfo(expr, "Comet only supports regexp_replace with an offset of 1 (no offset).")
None
}
case BitLength(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("bit_length", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
case If(predicate, trueValue, falseValue) =>
val predicateExpr = exprToProtoInternal(predicate, inputs, binding)
val trueExpr = exprToProtoInternal(trueValue, inputs, binding)
val falseExpr = exprToProtoInternal(falseValue, inputs, binding)
if (predicateExpr.isDefined && trueExpr.isDefined && falseExpr.isDefined) {
val builder = ExprOuterClass.IfExpr.newBuilder()
builder.setIfExpr(predicateExpr.get)
builder.setTrueExpr(trueExpr.get)
builder.setFalseExpr(falseExpr.get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setIf(builder)
.build())
} else {
withInfo(expr, predicate, trueValue, falseValue)
None
}
case CaseWhen(branches, elseValue) =>
var allBranches: Seq[Expression] = Seq()
val whenSeq = branches.map(elements => {
allBranches = allBranches :+ elements._1
exprToProtoInternal(elements._1, inputs, binding)
})
val thenSeq = branches.map(elements => {
allBranches = allBranches :+ elements._2
exprToProtoInternal(elements._2, inputs, binding)
})
assert(whenSeq.length == thenSeq.length)
if (whenSeq.forall(_.isDefined) && thenSeq.forall(_.isDefined)) {
val builder = ExprOuterClass.CaseWhen.newBuilder()
builder.addAllWhen(whenSeq.map(_.get).asJava)
builder.addAllThen(thenSeq.map(_.get).asJava)
if (elseValue.isDefined) {
val elseValueExpr =
exprToProtoInternal(elseValue.get, inputs, binding)
if (elseValueExpr.isDefined) {
builder.setElseExpr(elseValueExpr.get)
} else {
withInfo(expr, elseValue.get)
return None
}
}
Some(
ExprOuterClass.Expr
.newBuilder()
.setCaseWhen(builder)
.build())
} else {
withInfo(expr, allBranches: _*)
None
}
case ConcatWs(children) =>
var childExprs: Seq[Expression] = Seq()
val exprs = children.map(e => {
val castExpr = Cast(e, StringType)
childExprs = childExprs :+ castExpr
exprToProtoInternal(castExpr, inputs, binding)
})
val optExpr = scalarFunctionExprToProto("concat_ws", exprs: _*)
optExprWithInfo(optExpr, expr, childExprs: _*)
case Chr(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("chr", childExpr)
optExprWithInfo(optExpr, expr, child)
case InitCap(child) =>
if (CometConf.COMET_EXEC_INITCAP_ENABLED.get()) {
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("initcap", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
} else {
withInfo(
expr,
"Comet initCap is not compatible with Spark yet. " +
"See https://github.com/apache/datafusion-comet/issues/1052 ." +
s"Set ${CometConf.COMET_EXEC_INITCAP_ENABLED.key}=true to enable it anyway.")
None
}
case Length(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("length", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
case Md5(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProto("md5", childExpr)
optExprWithInfo(optExpr, expr, child)
case OctetLength(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("octet_length", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
case Reverse(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("reverse", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
case StringInstr(str, substr) =>
val leftCast = Cast(str, StringType)
val rightCast = Cast(substr, StringType)
val leftExpr = exprToProtoInternal(leftCast, inputs, binding)
val rightExpr = exprToProtoInternal(rightCast, inputs, binding)
val optExpr = scalarFunctionExprToProto("strpos", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, leftCast, rightCast)
case StringRepeat(str, times) =>
val leftCast = Cast(str, StringType)
val rightCast = Cast(times, LongType)
val leftExpr = exprToProtoInternal(leftCast, inputs, binding)
val rightExpr = exprToProtoInternal(rightCast, inputs, binding)
val optExpr = scalarFunctionExprToProto("repeat", leftExpr, rightExpr)
optExprWithInfo(optExpr, expr, leftCast, rightCast)
case StringReplace(src, search, replace) =>
val srcCast = Cast(src, StringType)
val searchCast = Cast(search, StringType)
val replaceCast = Cast(replace, StringType)
val srcExpr = exprToProtoInternal(srcCast, inputs, binding)
val searchExpr = exprToProtoInternal(searchCast, inputs, binding)
val replaceExpr = exprToProtoInternal(replaceCast, inputs, binding)
val optExpr = scalarFunctionExprToProto("replace", srcExpr, searchExpr, replaceExpr)
optExprWithInfo(optExpr, expr, srcCast, searchCast, replaceCast)
case StringTranslate(src, matching, replace) =>
val srcCast = Cast(src, StringType)
val matchingCast = Cast(matching, StringType)
val replaceCast = Cast(replace, StringType)
val srcExpr = exprToProtoInternal(srcCast, inputs, binding)
val matchingExpr = exprToProtoInternal(matchingCast, inputs, binding)
val replaceExpr = exprToProtoInternal(replaceCast, inputs, binding)
val optExpr = scalarFunctionExprToProto("translate", srcExpr, matchingExpr, replaceExpr)
optExprWithInfo(optExpr, expr, srcCast, matchingCast, replaceCast)
case StringTrim(srcStr, trimStr) =>
trim(expr, srcStr, trimStr, inputs, binding, "trim")
case StringTrimLeft(srcStr, trimStr) =>
trim(expr, srcStr, trimStr, inputs, binding, "ltrim")
case StringTrimRight(srcStr, trimStr) =>
trim(expr, srcStr, trimStr, inputs, binding, "rtrim")
case StringTrimBoth(srcStr, trimStr, _) =>
trim(expr, srcStr, trimStr, inputs, binding, "btrim")
case Upper(child) =>
if (CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("upper", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
} else {
withInfo(
expr,
"Comet is not compatible with Spark for case conversion in " +
s"locale-specific cases. Set ${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true " +
"to enable it anyway.")
None
}
case Lower(child) =>
if (CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs, binding)
val optExpr = scalarFunctionExprToProto("lower", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
} else {
withInfo(
expr,
"Comet is not compatible with Spark for case conversion in " +
s"locale-specific cases. Set ${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true " +
"to enable it anyway.")
None
}
case BitwiseAnd(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setBitwiseAnd(binaryExpr))
case BitwiseNot(child) =>
createUnaryExpr(
expr,
child,
inputs,
binding,
(builder, unaryExpr) => builder.setBitwiseNot(unaryExpr))
case BitwiseOr(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setBitwiseOr(binaryExpr))
case BitwiseXor(left, right) =>
createBinaryExpr(
expr,
left,
right,
inputs,
binding,
(builder, binaryExpr) => builder.setBitwiseXor(binaryExpr))
case ShiftRight(left, right) =>
// DataFusion bitwise shift right expression requires
// same data type between left and right side
val rightExpression = if (left.dataType == LongType) {
Cast(right, LongType)
} else {
right
}
createBinaryExpr(
expr,
left,
rightExpression,
inputs,
binding,
(builder, binaryExpr) => builder.setBitwiseShiftRight(binaryExpr))
case ShiftLeft(left, right) =>
// DataFusion bitwise shift right expression requires
// same data type between left and right side
val rightExpression = if (left.dataType == LongType) {
Cast(right, LongType)
} else {
right
}
createBinaryExpr(
expr,
left,
rightExpression,
inputs,
binding,
(builder, binaryExpr) => builder.setBitwiseShiftLeft(binaryExpr))
case In(value, list) =>
in(expr, value, list, inputs, binding, negate = false)
case InSet(value, hset) =>
val valueDataType = value.dataType
val list = hset.map { setVal =>
Literal(setVal, valueDataType)
}.toSeq
// Change `InSet` to `In` expression
// We do Spark `InSet` optimization in native (DataFusion) side.
in(expr, value, list, inputs, binding, negate = false)
case Not(In(value, list)) =>
in(expr, value, list, inputs, binding, negate = true)
case Not(child) =>
createUnaryExpr(
expr,
child,
inputs,
binding,
(builder, unaryExpr) => builder.setNot(unaryExpr))
case UnaryMinus(child, failOnError) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val builder = ExprOuterClass.UnaryMinus.newBuilder()
builder.setChild(childExpr.get)
builder.setFailOnError(failOnError)
Some(
ExprOuterClass.Expr
.newBuilder()
.setUnaryMinus(builder)
.build())
} else {
withInfo(expr, child)
None
}
case a @ Coalesce(_) =>
val exprChildren = a.children.map(exprToProtoInternal(_, inputs, binding))
scalarFunctionExprToProto("coalesce", exprChildren: _*)
// With Spark 3.4, CharVarcharCodegenUtils.readSidePadding gets called to pad spaces for
// char types.
// See https://github.com/apache/spark/pull/38151
case s: StaticInvoke
// classOf gets ther runtime class of T, which lets us compare directly
// Otherwise isInstanceOf[Class[T]] will always evaluate to true for Class[_]
if s.staticObject == classOf[CharVarcharCodegenUtils] &&
s.dataType.isInstanceOf[StringType] &&
s.functionName == "readSidePadding" &&
s.arguments.size == 2 &&
s.propagateNull &&
!s.returnNullable &&
s.isDeterministic =>
val argsExpr = Seq(
exprToProtoInternal(Cast(s.arguments(0), StringType), inputs, binding),
exprToProtoInternal(s.arguments(1), inputs, binding))
if (argsExpr.forall(_.isDefined)) {
scalarFunctionExprToProto("read_side_padding", argsExpr: _*)
} else {
withInfo(expr, s.arguments: _*)
None
}
// read-side padding in Spark 3.5.2+ is represented by rpad function
case StringRPad(srcStr, size, chars) =>
chars match {
case Literal(str, DataTypes.StringType) if str.toString == " " =>
val arg0 = exprToProtoInternal(srcStr, inputs, binding)
val arg1 = exprToProtoInternal(size, inputs, binding)
if (arg0.isDefined && arg1.isDefined) {
scalarFunctionExprToProto("rpad", arg0, arg1)
} else {
withInfo(expr, "rpad unsupported arguments", srcStr, size)
None
}
case _ =>
withInfo(expr, "rpad only supports padding with spaces")
None
}
case KnownFloatingPointNormalized(NormalizeNaNAndZero(expr)) =>
val dataType = serializeDataType(expr.dataType)
if (dataType.isEmpty) {
withInfo(expr, s"Unsupported datatype ${expr.dataType}")
return None
}
val ex = exprToProtoInternal(expr, inputs, binding)
ex.map { child =>
val builder = ExprOuterClass.NormalizeNaNAndZero
.newBuilder()
.setChild(child)
.setDatatype(dataType.get)
ExprOuterClass.Expr.newBuilder().setNormalizeNanAndZero(builder).build()
}
case s @ execution.ScalarSubquery(_, _) if supportedDataType(s.dataType) =>
val dataType = serializeDataType(s.dataType)
if (dataType.isEmpty) {
withInfo(s, s"Scalar subquery returns unsupported datatype ${s.dataType}")
return None
}
val builder = ExprOuterClass.Subquery
.newBuilder()
.setId(s.exprId.id)
.setDatatype(dataType.get)
Some(ExprOuterClass.Expr.newBuilder().setSubquery(builder).build())
case UnscaledValue(child) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr =
scalarFunctionExprToProtoWithReturnType("unscaled_value", LongType, childExpr)
optExprWithInfo(optExpr, expr, child)
case MakeDecimal(child, precision, scale, true) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val optExpr = scalarFunctionExprToProtoWithReturnType(
"make_decimal",
DecimalType(precision, scale),
childExpr)
optExprWithInfo(optExpr, expr, child)
case b @ BloomFilterMightContain(_, _) =>
val bloomFilter = b.left
val value = b.right
val bloomFilterExpr = exprToProtoInternal(bloomFilter, inputs, binding)
val valueExpr = exprToProtoInternal(value, inputs, binding)
if (bloomFilterExpr.isDefined && valueExpr.isDefined) {
val builder = ExprOuterClass.BloomFilterMightContain.newBuilder()
builder.setBloomFilter(bloomFilterExpr.get)
builder.setValue(valueExpr.get)
Some(
ExprOuterClass.Expr
.newBuilder()
.setBloomFilterMightContain(builder)
.build())
} else {
withInfo(expr, bloomFilter, value)
None
}
case _: Murmur3Hash => CometMurmur3Hash.convert(expr, inputs, binding)
case _: XxHash64 => CometXxHash64.convert(expr, inputs, binding)
case Sha2(left, numBits) =>
if (!numBits.foldable) {
withInfo(expr, "non literal numBits is not supported")
return None
}
// it's possible for spark to dynamically compute the number of bits from input
// expression, however DataFusion does not support that yet.
val childExpr = exprToProtoInternal(left, inputs, binding)
val bits = numBits.eval().asInstanceOf[Int]
val algorithm = bits match {
case 224 => "sha224"
case 256 | 0 => "sha256"
case 384 => "sha384"
case 512 => "sha512"
case _ =>
null
}
if (algorithm == null) {
exprToProtoInternal(Literal(null, StringType), inputs, binding)
} else {
scalarFunctionExprToProtoWithReturnType(algorithm, StringType, childExpr)
}
case struct @ CreateNamedStruct(_) =>
if (struct.names.length != struct.names.distinct.length) {
withInfo(expr, "CreateNamedStruct with duplicate field names are not supported")
return None
}
val valExprs = struct.valExprs.map(exprToProtoInternal(_, inputs, binding))
if (valExprs.forall(_.isDefined)) {
val structBuilder = ExprOuterClass.CreateNamedStruct.newBuilder()
structBuilder.addAllValues(valExprs.map(_.get).asJava)
structBuilder.addAllNames(struct.names.map(_.toString).asJava)
Some(
ExprOuterClass.Expr
.newBuilder()
.setCreateNamedStruct(structBuilder)
.build())
} else {
withInfo(expr, "unsupported arguments for CreateNamedStruct", struct.valExprs: _*)
None
}
case GetStructField(child, ordinal, _) =>
exprToProtoInternal(child, inputs, binding).map { childExpr =>
val getStructFieldBuilder = ExprOuterClass.GetStructField
.newBuilder()
.setChild(childExpr)
.setOrdinal(ordinal)
ExprOuterClass.Expr
.newBuilder()
.setGetStructField(getStructFieldBuilder)
.build()
}
case CreateArray(children, _) =>
val childExprs = children.map(exprToProtoInternal(_, inputs, binding))
if (childExprs.forall(_.isDefined)) {
scalarFunctionExprToProto("make_array", childExprs: _*)
} else {
withInfo(expr, "unsupported arguments for CreateArray", children: _*)
None
}
case GetArrayItem(child, ordinal, failOnError) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val ordinalExpr = exprToProtoInternal(ordinal, inputs, binding)
if (childExpr.isDefined && ordinalExpr.isDefined) {
val listExtractBuilder = ExprOuterClass.ListExtract
.newBuilder()
.setChild(childExpr.get)
.setOrdinal(ordinalExpr.get)
.setOneBased(false)
.setFailOnError(failOnError)
Some(
ExprOuterClass.Expr
.newBuilder()
.setListExtract(listExtractBuilder)
.build())
} else {
withInfo(expr, "unsupported arguments for GetArrayItem", child, ordinal)
None
}
case expr if expr.prettyName == "array_insert" => convert(CometArrayInsert)
case ElementAt(child, ordinal, defaultValue, failOnError)
if child.dataType.isInstanceOf[ArrayType] =>
val childExpr = exprToProtoInternal(child, inputs, binding)
val ordinalExpr = exprToProtoInternal(ordinal, inputs, binding)
val defaultExpr = defaultValue.flatMap(exprToProtoInternal(_, inputs, binding))
if (childExpr.isDefined && ordinalExpr.isDefined &&
defaultExpr.isDefined == defaultValue.isDefined) {
val arrayExtractBuilder = ExprOuterClass.ListExtract
.newBuilder()
.setChild(childExpr.get)
.setOrdinal(ordinalExpr.get)
.setOneBased(true)
.setFailOnError(failOnError)
defaultExpr.foreach(arrayExtractBuilder.setDefaultValue(_))
Some(
ExprOuterClass.Expr
.newBuilder()
.setListExtract(arrayExtractBuilder)
.build())
} else {
withInfo(expr, "unsupported arguments for ElementAt", child, ordinal)
None
}
case GetArrayStructFields(child, _, ordinal, _, _) =>
val childExpr = exprToProtoInternal(child, inputs, binding)
if (childExpr.isDefined) {
val arrayStructFieldsBuilder = ExprOuterClass.GetArrayStructFields
.newBuilder()
.setChild(childExpr.get)
.setOrdinal(ordinal)
Some(
ExprOuterClass.Expr
.newBuilder()
.setGetArrayStructFields(arrayStructFieldsBuilder)
.build())
} else {
withInfo(expr, "unsupported arguments for GetArrayStructFields", child)
None
}
case _: ArrayRemove => convert(CometArrayRemove)
case _: ArrayContains => convert(CometArrayContains)
case _: ArrayAppend => convert(CometArrayAppend)
case _: ArrayIntersect => convert(CometArrayIntersect)
case _: ArrayJoin => convert(CometArrayJoin)
case _: ArraysOverlap => convert(CometArraysOverlap)
case _: ArrayRepeat => convert(CometArrayRepeat)
case _ @ArrayFilter(_, func) if func.children.head.isInstanceOf[IsNotNull] =>
convert(CometArrayCompact)
case _: ArrayExcept =>
convert(CometArrayExcept)
case _ =>
withInfo(expr, s"${expr.prettyName} is not supported", expr.children: _*)
None
}
}