in src/spark-project/engine-spark/src/main/scala/org/apache/kylin/engine/spark/builder/NonEquiJoinConditionBuilder.scala [69:218]
private def doConvert(condDesc: NonEquiJoinCondition): Any = {
val children = condDesc.getOperands.map(in => doConvert(in))
def convertBinary: (Column, Column) = {
assert(children.length == 2)
(k_col(children.head), k_col(children.last))
}
// TODO: get rid of RelDataType
val dataType = condDesc.getDataType
val relDataType =
if (dataType.getTypeName.allowsPrecScale(true, true)) {
new BasicSqlType(typeSystem, dataType.getTypeName, dataType.getPrecision, dataType.getScale)
} else if (dataType.getTypeName.allowsPrec()) {
new BasicSqlType(typeSystem, dataType.getTypeName, dataType.getPrecision)
} else {
new BasicSqlType(typeSystem, dataType.getTypeName)
}
val sqlTypeName = condDesc.getDataType.getTypeName
val op = condDesc.getOp
val opName = condDesc.getOpName
op match {
case INPUT_REF =>
col(NSparkCubingUtil.convertFromDot(condDesc.getColRef.getBackTickIdentity))
case LITERAL =>
getLiteral(condDesc.getTypedValue, relDataType)
case AND =>
condDesc.getOperands.map(childCond => convert(childCond)).reduce(_ && _)
case OR =>
condDesc.getOperands.map(childCond => convert(childCond)).reduce(_ || _)
case NOT =>
assert(condDesc.getOperands.length == 1)
not(condDesc.getOperands.map(childCond => convert(childCond)).head)
case EQUALS =>
val (left: Column, right: Column) = convertBinary
left === right
case GREATER_THAN =>
val (left: Column, right: Column) = convertBinary
left > right
case LESS_THAN =>
val (left: Column, right: Column) = convertBinary
left < right
case GREATER_THAN_OR_EQUAL =>
val (left: Column, right: Column) = convertBinary
left >= right
case LESS_THAN_OR_EQUAL =>
val (left: Column, right: Column) = convertBinary
left <= right
case NOT_EQUALS =>
val (left: Column, right: Column) = convertBinary
left =!= right
case PLUS =>
assert(children.length == 2)
sqlTypeName match {
case SqlTypeName.DATE =>
k_lit(children.head)
.cast(TimestampType)
.cast(LongType)
.plus(k_lit(children.last))
.cast(TimestampType)
.cast(DateType)
case SqlTypeName.TIMESTAMP =>
k_lit(children.head)
.cast(TimestampType)
.cast(LongType)
.plus(k_lit(children.last))
.cast(TimestampType)
case _ =>
k_lit(children.head)
.plus(k_lit(children.last))
}
case MINUS =>
assert(children.length == 2)
if (sqlTypeName == SqlTypeName.DATE || sqlTypeName == SqlTypeName.TIMESTAMP) {
sqlTypeName match {
case SqlTypeName.DATE =>
return k_lit(children.head).cast(TimestampType).cast(LongType).minus(lit(children.last)).cast(TimestampType).cast(DateType)
case SqlTypeName.TIMESTAMP =>
return k_lit(children.head)
.cast(LongType)
.minus(k_lit(children.last))
.cast(TimestampType)
case _ =>
}
val timeUnitName = relDataType
.asInstanceOf[IntervalSqlType]
.getIntervalQualifier
.timeUnitRange
.name
if ("DAY".equalsIgnoreCase(timeUnitName)
|| "SECOND".equalsIgnoreCase(timeUnitName)
|| "HOUR".equalsIgnoreCase(timeUnitName)
|| "MINUTE".equalsIgnoreCase(timeUnitName)) {
// for ADD_DAY case
// the calcite plan looks like: /INT(Reinterpret(-($0, 2012-01-01)), 86400000)
// and the timeUnitName is DAY
// for ADD_WEEK case
// the calcite plan looks like: /INT(CAST(/INT(Reinterpret(-($0, 2000-01-01)), 1000)):INTEGER, 604800)
// and the timeUnitName is SECOND
// for MINUTE case
// the Calcite plan looks like: CAST(/INT(Reinterpret(-($1, CAST($0):TIMESTAMP(0))), 60000)):INTEGER
// for HOUR case
// the Calcite plan looks like: CAST(/INT(Reinterpret(-($1, CAST($0):TIMESTAMP(0))), 3600000)):INTEGER
// expecting ts instead of seconds
// so we need to multiply 1000 here
val ts1 = k_lit(children.head).cast(TimestampType).cast(LongType) // col
val ts2 = k_lit(children.last).cast(TimestampType).cast(LongType) // lit
ts1.minus(ts2).multiply(1000)
} else if ("MONTH".equalsIgnoreCase(timeUnitName) || "YEAR"
.equalsIgnoreCase(timeUnitName)) {
// for ADD_YEAR case,
// the calcite plan looks like: CAST(/INT(Reinterpret(-($0, 2000-03-01)), 12)):INTEGER
// and the timeUnitName is YEAR
// for ADD_QUARTER case
// the calcite plan looks like: /INT(CAST(Reinterpret(-($0, 2000-01-01))):INTEGER, 3)
// and the timeUnitName is MONTH
// for ADD_MONTH case
val ts1 = k_lit(children.head).cast(TimestampType)
val ts2 = k_lit(children.last).cast(TimestampType)
k_subtract_months(ts1, ts2)
} else {
throw new IllegalStateException(
"Unsupported SqlInterval: " + timeUnitName)
}
} else {
k_lit(children.head).minus(k_lit(children.last))
}
case TIMES =>
assert(children.length == 2)
k_lit(children.head).multiply(k_lit(children.last))
case MOD =>
assert(children.size == 2)
val (left: Column, right: Any) = convertBinary
left mod right
case _ =>
ExpressionConverter.convert(sqlTypeName, relDataType, op, opName, children)
}
}