in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala [63:181]
def createAggFunction(call: AggregateCall, index: Int): UserDefinedFunction = {
val argTypes: Array[LogicalType] = call.getArgList
.map(inputRowType.getChildren.get(_))
.toArray
call.getAggregation match {
case a: SqlAvgAggFunction if a.kind == SqlKind.AVG => createAvgAggFunction(argTypes)
case _: SqlSumAggFunction => createSumAggFunction(argTypes, index)
case _: SqlSumEmptyIsZeroAggFunction => createSum0AggFunction(argTypes)
case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MIN =>
createMinAggFunction(argTypes, index)
case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MAX =>
createMaxAggFunction(argTypes, index)
case _: SqlCountAggFunction if call.getArgList.size() > 1 =>
throw new TableException("We now only support the count of one field.")
// TODO supports CountDistinctAggFunction
case _: SqlCountAggFunction if call.isDistinct && call.isApproximate =>
createApproxCountDistinctAggFunction(argTypes, index)
case _: SqlCountAggFunction if call.getArgList.isEmpty => createCount1AggFunction(argTypes)
case _: SqlCountAggFunction => createCountAggFunction(argTypes)
case a: SqlRankFunction if a.getKind == SqlKind.ROW_NUMBER =>
createRowNumberAggFunction(argTypes)
case a: SqlRankFunction if a.getKind == SqlKind.RANK =>
createRankAggFunction(argTypes)
case a: SqlRankFunction if a.getKind == SqlKind.DENSE_RANK =>
createDenseRankAggFunction(argTypes)
case a: SqlRankFunction if a.getKind == SqlKind.CUME_DIST =>
if (isBounded) {
createCumeDistAggFunction(argTypes)
} else {
throw new TableException("CUME_DIST Function is not supported in stream mode.")
}
case a: SqlRankFunction if a.getKind == SqlKind.PERCENT_RANK =>
if (isBounded) {
createPercentRankAggFunction(argTypes)
} else {
throw new TableException("PERCENT_RANK Function is not supported in stream mode.")
}
case _: SqlNtileAggFunction =>
if (isBounded) {
createNTILEAggFUnction(argTypes)
} else {
throw new TableException("NTILE Function is not supported in stream mode.")
}
case func: SqlLeadLagAggFunction =>
if (isBounded) {
createBatchLeadLagAggFunction(argTypes, index)
} else {
createStreamLeadLagAggFunction(func, argTypes, index)
}
case _: SqlSingleValueAggFunction =>
createSingleValueAggFunction(argTypes)
case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.FIRST_VALUE =>
createFirstValueAggFunction(argTypes, index)
case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE =>
createLastValueAggFunction(argTypes, index)
case _: SqlListAggFunction if call.getArgList.size() == 1 =>
createListAggFunction(argTypes, index)
case _: SqlListAggFunction if call.getArgList.size() == 2 =>
createListAggWsFunction(argTypes, index)
// TODO supports SqlCardinalityCountAggFunction
case a: SqlAggFunction if a.getKind == SqlKind.COLLECT =>
createCollectAggFunction(argTypes)
case a: SqlAggFunction if a.getKind == SqlKind.ARRAY_AGG =>
createArrayAggFunction(argTypes, call.ignoreNulls)
case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_OBJECTAGG =>
val onNull = fn.asInstanceOf[SqlJsonObjectAggAggFunction].getNullClause
new JsonObjectAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL)
case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_ARRAYAGG =>
val onNull = fn.asInstanceOf[SqlJsonArrayAggAggFunction].getNullClause
new JsonArrayAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL)
case udagg: AggSqlFunction =>
// Can not touch the literals, Calcite make them in previous RelNode.
// In here, all inputs are input refs.
val constants = new util.ArrayList[AnyRef]()
argTypes.foreach(_ => constants.add(null))
udagg.makeFunction(constants.toArray, argTypes)
case bridge: BridgingSqlAggFunction =>
bridge.getDefinition match {
// built-in imperativeFunction
case BuiltInFunctionDefinitions.PERCENTILE =>
createPercentileAggFunction(argTypes)
// DeclarativeAggregateFunction & UDF
case _ =>
bridge.getDefinition.asInstanceOf[UserDefinedFunction]
}
case unSupported: SqlAggFunction =>
throw new TableException(s"Unsupported Function: '${unSupported.getName}'")
}
}