in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRelMdUtil.scala [817:895]
override def visitCall(call: RexCall): JDouble = {
val rowCount = mq.getRowCount(calc)
val distinctRowCount: JDouble = if (call.isA(SqlKind.MINUS_PREFIX)) {
cardOfCalcExpr(mq, calc, call.getOperands.get(0))
} else if (call.isA(ImmutableList.of(SqlKind.PLUS, SqlKind.MINUS))) {
val card0 = cardOfCalcExpr(mq, calc, call.getOperands.get(0))
if (card0 == null) {
null
} else {
val card1 = cardOfCalcExpr(mq, calc, call.getOperands.get(1))
if (card1 == null) {
null
} else {
Math.max(card0, card1)
}
}
} else if (call.isA(ImmutableList.of(SqlKind.TIMES, SqlKind.DIVIDE))) {
NumberUtil.multiply(
cardOfCalcExpr(mq, calc, call.getOperands.get(0)),
cardOfCalcExpr(mq, calc, call.getOperands.get(1)))
} else if (call.isA(SqlKind.EXTRACT)) {
val extractUnit = call.getOperands.get(0)
val timeOperand = call.getOperands.get(1)
extractUnit match {
// go https://www.postgresql.org/docs/9.1/static/
// functions-datetime.html#FUNCTIONS-DATETIME-EXTRACT to get the definitions of timeunits
case unit: RexLiteral =>
val unitValue = unit.getValue
val timeOperandType = timeOperand.getType.getSqlTypeName
// assume min time is 1970-01-01 00:00:00, max time is 2100-12-31 21:59:59
unitValue match {
case YEAR => 130d // [1970, 2100]
case MONTH => 12d
case DAY => 31d
case HOUR => 24d
case MINUTE => 60d
case SECOND =>
timeOperandType match {
case TIMESTAMP | TIME => 60 * 1000d // [0.000, 59.999]
case _ => 60d // [0, 59]
}
case QUARTER => 4d
case WEEK => 53d // [1, 53]
case MILLISECOND =>
timeOperandType match {
case TIMESTAMP | TIME => 60 * 1000d // [0.000, 59.999]
case _ => 60d // [0, 59]
}
case MICROSECOND =>
timeOperandType match {
case TIMESTAMP | TIME => 60 * 1000d * 1000d // [0.000, 59.999]
case _ => 60d // [0, 59]
}
case DOW => 7d // [0, 6]
case DOY => 366d // [1, 366]
case EPOCH =>
timeOperandType match {
// the number of seconds since 1970-01-01 00:00:00 UTC
case TIMESTAMP | TIME => 130 * 24 * 60 * 60 * 1000d
case _ => 130 * 24 * 60 * 60d
}
case DECADE => 13d // The year field divided by 10
case CENTURY => 2d
case MILLENNIUM => 2d
case _ => cardOfCalcExpr(mq, calc, timeOperand)
}
case _ => cardOfCalcExpr(mq, calc, timeOperand)
}
} else if (call.getOperands.size() == 1) {
cardOfCalcExpr(mq, calc, call.getOperands.get(0))
} else {
if (rowCount != null) rowCount / 10 else null
}
if (distinctRowCount == null) {
null
} else {
FlinkRelMdUtil.numDistinctVals(distinctRowCount, rowCount)
}
}