in src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/ExpressionConverter.scala [78:352]
def convert(sqlTypeName: SqlTypeName, relDataType: RelDataType, op: SqlKind, opName: String, children: Seq[Any]): Any = {
op match {
case IS_NULL =>
assert(children.size == 1)
k_lit(children.head).isNull
case IS_NOT_NULL =>
assert(children.size == 1)
k_lit(children.head).isNotNull
case LIKE =>
if (children.length == 3) {
if (!children.last.isInstanceOf[java.lang.String] || children.last.asInstanceOf[java.lang.String].length != 1) {
throw new UnsupportedOperationException(
s"Invalid paramters for LIKE ESCAPE, expecting a single char for ESCAPE")
}
val escapeChar = children.last.asInstanceOf[java.lang.String].charAt(0)
k_like(k_lit(children.head), k_lit(children(1)), escapeChar)
} else if (children.length == 2) {
k_like(k_lit(children.head), k_lit(children.last))
} else {
throw new UnsupportedOperationException(
s"Invalid paramters for LIKE, expecting LIKE ... , LIKE ... ESCAPE ... ")
}
case SIMILAR =>
if (children.size == 2) {
k_similar(k_lit(children.head), k_lit(children.last))
} else if (children.size == 3) {
if (!children.last.isInstanceOf[java.lang.String] || children.last.asInstanceOf[java.lang.String].length != 1) {
throw new UnsupportedOperationException(
s"Invalid paramters for SIMILAR TO ESCAPE, expecting a single char for ESCAPE")
}
val escapeChar = children.last.asInstanceOf[java.lang.String].charAt(0)
val stringReplacedWithEscapeChar = if (!children(1).asInstanceOf[java.lang.String].contains(escapeChar)) {
children(1)
} else {
val charArray = children(1).asInstanceOf[java.lang.String].toCharArray
var escapeCharReplaced = false
val stringDeletedEscape = new StringBuilder
for (i <- 0 until charArray.length) {
if (charArray(i) != escapeChar) {
stringDeletedEscape.append(charArray(i))
escapeCharReplaced = false
} else {
if (!escapeCharReplaced) {
stringDeletedEscape.append("\\")
escapeCharReplaced = true
} else {
stringDeletedEscape.append(escapeChar)
escapeCharReplaced = false
}
}
}
stringDeletedEscape.toString()
}
k_similar(k_lit(children.head), k_lit(stringReplacedWithEscapeChar))
} else {
throw new UnsupportedOperationException(
s"Invalid paramters for SIMILAR TO, expecting SIMILAR TO ... , SIMILAR TO ... ESCAPE ... ")
}
case MINUS_PREFIX =>
assert(children.size == 1)
negate(k_lit(children.head))
case IN =>
val (columns, values) = children.map(c => k_lit(c).expr).partition {
case _: AttributeReference | _: UnresolvedAttribute => true
case _ => false
}
if (columns.size > 1) {
in(CreateStruct(columns), values)
} else {
val values = children.drop(1).map(c => k_lit(c).expr)
in(k_lit(children.head).expr, values)
}
case NOT_IN =>
val (columns, values) = children.map(c => k_lit(c).expr).partition {
case _: AttributeReference | _: UnresolvedAttribute => true
case _ => false
}
if (columns.size > 1) {
not(in(CreateStruct(columns), values))
} else {
val values = children.drop(1).map(c => k_lit(c).expr)
not(in(k_lit(children.head).expr, values))
}
case DIVIDE =>
assert(children.size == 2)
k_lit(children.head).divide(k_lit(children.last))
case CASE =>
val evens =
children.zipWithIndex.filter(p => p._2 % 2 == 0).map(p => k_lit(p._1))
val odds =
children.zipWithIndex.filter(p => p._2 % 2 == 1).map(p => k_lit(p._1))
assert(evens.length == odds.length + 1)
val zip = evens zip odds
var column: Column = null
if (zip.nonEmpty) {
column = when(zip.head._1, zip.head._2)
zip
.drop(1)
.foreach(p => {
column = column.when(p._1, p._2)
})
}
column.otherwise(evens.last)
case EXTRACT =>
val timeUnit = children.head.asInstanceOf[String]
val inputAsTS = children.apply(1)
timeUnit match {
case "YEAR" => year(k_lit(inputAsTS))
case "QUARTER" => quarter(k_lit(inputAsTS))
case "MONTH" => month(k_lit(inputAsTS))
case "WEEK" => weekofyear(k_lit(inputAsTS))
case "DOY" => dayofyear(k_lit(inputAsTS))
case "DAY" => dayofmonth(k_lit(inputAsTS))
case "DOW" => dayofweek(k_lit(inputAsTS))
case "HOUR" => hour(k_lit(inputAsTS))
case "MINUTE" => minute(k_lit(inputAsTS))
case "SECOND" => second(k_lit(inputAsTS))
case _ =>
throw new UnsupportedSparkFunctionException(
s"Unsupported function $timeUnit")
}
case REINTERPRET =>
k_lit(children.head)
case CAST =>
// all date type is long,skip is
val goalType = SparderTypeUtil.convertSqlTypeToSparkType(
relDataType)
k_lit(children.head).cast(goalType)
case TRIM =>
if (children.length == 3) {
children.head match {
case "TRAILING" =>
rtrim(k_lit(children.apply(2)), children.apply(1).asInstanceOf[String])
case "LEADING" =>
ltrim(k_lit(children.apply(2)), children.apply(1).asInstanceOf[String])
case "BOTH" =>
trim(k_lit(children.apply(2)), children.apply(1).asInstanceOf[String])
}
} else {
trim(k_lit(children.head))
}
case OTHER =>
val funcName = opName.toLowerCase
funcName match {
case "||" => concat(k_lit(children.head), k_lit(children.apply(1)))
case _ =>
throw new UnsupportedOperationException(
s"Unsupported function $funcName")
}
case OTHER_FUNCTION =>
val funcName = opName.toLowerCase
funcName match {
// math functions
case "abs" =>
abs(
k_lit(children.head).cast(SparderTypeUtil
.convertSqlTypeToSparkType(relDataType)))
case "truncate" =>
if (children.size == 1) {
k_truncate(k_lit(children.head), k_lit(0))
} else {
k_truncate(k_lit(children.head), k_lit(children.apply(1)))
}
// datetime functions
case "to_char" | "date_format" =>
val part = k_lit(children.apply(1)).toString().toUpperCase match {
case "YEAR" => "y"
case "MONTH" => "M"
case "DAY" => "d"
case "HOUR" => "h"
case "MINUTE" => "m"
case "MINUTES" => "m"
case "SECOND" => "s"
case "SECONDS" => "s"
case _ => k_lit(children.apply(1)).toString()
}
date_format(k_lit(children.head), part)
case func if sparkUdfSet.contains(func) =>
call_udf(func, children.map(k_lit): _*)
case func if bitmapUDF.contains(func) =>
func match {
case "intersect_count_by_col" =>
new Column(IntersectCountByCol(children.head.asInstanceOf[Column].expr.children))
case "subtract_bitmap_value" | "subtract_bitmap_uuid_value_all" =>
new Column(SubtractBitmapAllValues(children.head.asInstanceOf[Column].expr,
children.last.asInstanceOf[Column].expr,
KylinConfig.getInstanceFromEnv.getBitmapValuesUpperBound))
case "subtract_bitmap_uuid" | "subtract_bitmap_uuid_distinct" =>
new Column(SubtractBitmapUUID(children.head.asInstanceOf[Column].expr,
children.last.asInstanceOf[Column].expr))
case "subtract_bitmap_uuid_count" =>
new Column(SubtractBitmapCount(children.head.asInstanceOf[Column].expr,
children.last.asInstanceOf[Column].expr))
case "subtract_bitmap_uuid_value" =>
val left = children.head.asInstanceOf[Column].expr
val right = children.get(1).asInstanceOf[Column].expr
val limitArg = children.get(2)
val offsetArg = children.get(3)
val limit = limitArg.toString.toInt
val offset = offsetArg.toString.toInt
if (limit < 0 || offset < 0) {
throw new UnsupportedOperationException(s"both limit and offset must be >= 0")
}
new Column(SubtractBitmapValue(left, right, limit, offset,
KylinConfig.getInstanceFromEnv.getBitmapValuesUpperBound))
case "bitmap_uuid_to_array" =>
new Column(BitmapUuidToArray(children.head.asInstanceOf[Column].expr))
}
case _ =>
throw new UnsupportedOperationException(
s"Unsupported function $funcName")
}
case CEIL =>
if (children.length == 1) {
ceil(k_lit(children.head))
} else if (children.length == 2) {
call_udf("ceil_datetime", children.map(k_lit): _*)
} else {
throw new UnsupportedOperationException(
s"ceil must provide one or two parameters under sparder")
}
case FLOOR =>
if (children.length == 1) {
floor(k_lit(children.head))
} else if (children.length == 2) {
date_trunc(children.apply(1).toString, k_lit(children.head))
} else {
throw new UnsupportedOperationException(
s"floor must provide one or two parameters under sparder")
}
case ARRAY_VALUE_CONSTRUCTOR =>
array(children.map(child => k_lit(child)): _*)
case ROW =>
call_udf("named_struct", children.zipWithIndex.flatMap { case (child, i) =>
Seq(lit(s"col${i + 1}"), k_lit(child))
}: _*)
case IS_NOT_DISTINCT_FROM =>
k_lit(children.head).eqNullSafe(k_lit(children.apply(1)))
// TDVT SQL626 - null compare with true/false is special
case IS_TRUE =>
// false and null => false
val col = k_lit(children.head)
col.isNotNull && col
case IS_NOT_TRUE =>
// true or not null => true or null => true
val col = k_lit(children.head)
col.isNull || !col
case IS_FALSE =>
// false and not null => false and null => false
val col = k_lit(children.head)
col.isNotNull && !col
case IS_NOT_FALSE =>
// true or null => true
val col = k_lit(children.head)
col.isNull || col
// see https://olapio.atlassian.net/browse/KE-42036
// Calcite 1.30 changed SqlKind.OTHER_FUNCTION with SqlKind.POSITION in SqlPositionFunction
case POSITION => //position(substr,str,start)
call_udf("position", children.map(k_lit): _*)
// see https://olapio.atlassian.net/browse/KE-42354
// Calcite 1.30 changed SqlKind.OTHER_FUNCTION with SqlKind.ITEM in SqlItemOperator
case ITEM =>
element_at(k_lit(children.head), k_lit(children.apply(1).asInstanceOf[Int] + 1))
// Calcite 1.30 changed the if operator to case, eliminate this change
case IF =>
call_udf("if", children.map(k_lit): _*)
case NVL =>
call_udf("ifnull", children.map(k_lit): _*)
case unsupportedFunc =>
throw new UnsupportedOperationException(unsupportedFunc.toString)
}
}