in src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/SparderRexVisitor.scala [55:312]
def this(dfs: Array[DataFrame],
rowType: RelDataType,
dataContext: DataContext) = this(dfs.flatMap(df => df.schema.fieldNames), rowType, dataContext)
def this(df: DataFrame,
rowType: RelDataType,
dataContext: DataContext) = this(Array(df), rowType, dataContext)
def this(plan: LogicalPlan,
rowType: RelDataType,
dataContext: DataContext) = this(plan.output.map(c => c.name), rowType, dataContext)
// scalastyle:off
override def visitCall(call: RexCall): Any = {
val children = new ListBuffer[Any]()
var isDateType = false
var isTimeType = false
for (operand <- call.operands.asScala) {
if (operand.getType.getSqlTypeName.name().equals("DATE")) {
isDateType = true
}
if (operand.getType.getSqlTypeName.name().equals("TIMESTAMP")) {
isTimeType = true
}
val childFilter = operand.accept(this)
children += childFilter
}
def getColumns: ListBuffer[Column] = {
children.map {
case null => new Column(Literal(null, DataTypes.BooleanType))
// see https://olapio.atlassian.net/browse/KE-42088
// Calcite 1.30 change child type
case boolCol: Boolean => new Column(Literal(boolCol.booleanValue(), DataTypes.BooleanType))
case child =>
assert(child.isInstanceOf[Column])
child.asInstanceOf[Column]
}
}
def getOperands: (Column, Column) = {
var left = k_lit(children.head)
var right = k_lit(children.last)
// get the lit pos.
// ($1, "2010-01-01 15:43:38") pos:1
// ("2010-01-01 15:43:38", $1) pos:0
val litPos = call.getOperands.asScala.zipWithIndex
.filter(!_._1.isInstanceOf[RexInputRef])
.map(_._2)
if (isDateType) {
litPos
.foreach {
case 0 => left = left.cast(TimestampType).cast(DateType)
case 1 => right = right.cast(TimestampType).cast(DateType)
}
}
if (isTimeType) {
litPos
.foreach {
case 0 => left = left.cast(TimestampType)
case 1 => right = right.cast(TimestampType)
}
}
(left, right)
}
val op = call.getOperator
op.getKind match {
case AND =>
getColumns.reduce {
_.and(_)
}
case OR =>
getColumns.reduce {
_.or(_)
}
case NOT =>
assert(children.size == 1)
children.foreach(filter => assert(filter.isInstanceOf[Column]))
not(children.head.asInstanceOf[Column])
case EQUALS =>
assert(children.size == 2)
val (left: Column, right: Column) = getOperands
left === right
case GREATER_THAN =>
assert(children.size == 2)
val (left: Column, right: Column) = getOperands
left > right
case LESS_THAN =>
assert(children.size == 2)
val (left: Column, right: Column) = getOperands
left < right
case GREATER_THAN_OR_EQUAL =>
assert(children.size == 2)
val (left: Column, right: Column) = getOperands
left >= right
case LESS_THAN_OR_EQUAL =>
assert(children.size == 2)
val (left: Column, right: Column) = getOperands
left <= right
case NOT_EQUALS =>
assert(children.size == 2)
val (left: Column, right: Column) = getOperands
left =!= right
case PLUS =>
assert(children.size == 2)
// see https://olapio.atlassian.net/browse/KE-42035
// Calcite 1.30 changed the name of operand in the SqlDatetimePlusOperator class,
// instead of using "DATETIME_PLUS", use "+"
if (op.getName.equals("+")) {
// scalastyle:off
children.last match {
case num: MonthNum => {
// both add_month and add_year case
val ts = k_lit(children.head).cast(TimestampType)
call.getType.getSqlTypeName match {
case SqlTypeName.DATE =>
return k_lit(k_add_months(k_lit(ts), num.num)).cast(DateType)
case SqlTypeName.TIMESTAMP =>
return k_lit(k_add_months(k_lit(ts), num.num)).cast(TimestampType)
case _ =>
return k_lit(k_add_months(k_lit(ts), num.num))
}
}
case _ =>
}
call.getType.getSqlTypeName match {
case SqlTypeName.CHAR | SqlTypeName.VARCHAR | SqlTypeName.BINARY | SqlTypeName.VARBINARY =>
return k_lit(children.head)
.cast(TimestampType)
.cast(LongType)
.plus(k_lit(children.last))
.cast(TimestampType)
.cast(StringType)
case _ =>
}
}
call.getType.getSqlTypeName match {
case SqlTypeName.DATE =>
k_lit(children.head)
.cast(TimestampType)
.cast(LongType)
.plus(k_lit(children.last))
.cast(TimestampType)
.cast(DateType)
case SqlTypeName.TIMESTAMP =>
k_lit(children.head)
.cast(TimestampType)
.cast(LongType)
.plus(k_lit(children.last))
.cast(TimestampType)
case _ =>
k_lit(children.head)
.plus(k_lit(children.last))
}
case MINUS =>
assert(children.size == 2)
if (op.isInstanceOf[SqlDatetimeSubtractionOperator]) {
call.getType.getSqlTypeName match {
case SqlTypeName.DATE =>
return k_lit(children.head).cast(TimestampType).cast(LongType).minus(lit(children.last)).cast(TimestampType).cast(DateType)
case SqlTypeName.TIMESTAMP =>
return k_lit(children.head)
.cast(LongType)
.minus(k_lit(children.last))
.cast(TimestampType)
case _ =>
}
val timeUnitName = call.`type`
.asInstanceOf[IntervalSqlType]
.getIntervalQualifier
.timeUnitRange
.name
if ("DAY".equalsIgnoreCase(timeUnitName)
|| "SECOND".equalsIgnoreCase(timeUnitName)
|| "HOUR".equalsIgnoreCase(timeUnitName)
|| "MINUTE".equalsIgnoreCase(timeUnitName)) {
// for ADD_DAY case
// the calcite plan looks like: /INT(Reinterpret(-($0, 2012-01-01)), 86400000)
// and the timeUnitName is DAY
// for ADD_WEEK case
// the calcite plan looks like: /INT(CAST(/INT(Reinterpret(-($0, 2000-01-01)), 1000)):INTEGER, 604800)
// and the timeUnitName is SECOND
// for MINUTE case
// the Calcite plan looks like: CAST(/INT(Reinterpret(-($1, CAST($0):TIMESTAMP(0))), 60000)):INTEGER
// for HOUR case
// the Calcite plan looks like: CAST(/INT(Reinterpret(-($1, CAST($0):TIMESTAMP(0))), 3600000)):INTEGER
// expecting ts instead of seconds
// so we need to multiply 1000 here
val ts1 = k_lit(children.head).cast(TimestampType).cast(LongType) //col
val ts2 = k_lit(children.last).cast(TimestampType).cast(LongType) //lit
ts1.minus(ts2).multiply(1000)
} else if ("MONTH".equalsIgnoreCase(timeUnitName) || "YEAR"
.equalsIgnoreCase(timeUnitName)) {
// for ADD_YEAR case,
// the calcite plan looks like: CAST(/INT(Reinterpret(-($0, 2000-03-01)), 12)):INTEGER
// and the timeUnitName is YEAR
// for ADD_QUARTER case
// the calcite plan looks like: /INT(CAST(Reinterpret(-($0, 2000-01-01))):INTEGER, 3)
// and the timeUnitName is MONTH
// for ADD_MONTH case
val ts1 = k_lit(children.head).cast(TimestampType)
val ts2 = k_lit(children.last).cast(TimestampType)
k_subtract_months(ts1, ts2)
} else {
throw new IllegalStateException(
"Unsupported SqlInterval: " + timeUnitName)
}
} else {
k_lit(children.head).minus(k_lit(children.last))
}
case TIMES =>
assert(children.size == 2)
children.head match {
case num: MonthNum => {
val ts = k_lit(children.apply(1)).cast(TimestampType).cast(LongType)
MonthNum(k_lit(ts).multiply(k_lit(num.num)))
}
case _ =>
k_lit(children.head).multiply(k_lit(children.last))
}
case MOD =>
assert(children.size == 2)
val (left: Column, right: Any) = getOperands
left mod right
case DIVIDE =>
assert(children.size == 2)
var divisionResult = k_lit(children.head).divide(k_lit(children.last))
if (op.isInstanceOf[SqlDateTimeDivisionOperator]) {
divisionResult = divisionResult.cast(LongType)
}
divisionResult
case COALESCE =>
assert(children.size == 2)
coalesce(children.apply(0).asInstanceOf[Column], k_lit(children.apply(1)))
case _ =>
ExpressionConverter.convert(call.getType.getSqlTypeName, call.`type`, op.getKind, op.getName, children)
}
}