def createAggFunction()

in flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala [63:181]


  def createAggFunction(call: AggregateCall, index: Int): UserDefinedFunction = {

    val argTypes: Array[LogicalType] = call.getArgList
      .map(inputRowType.getChildren.get(_))
      .toArray

    call.getAggregation match {
      case a: SqlAvgAggFunction if a.kind == SqlKind.AVG => createAvgAggFunction(argTypes)

      case _: SqlSumAggFunction => createSumAggFunction(argTypes, index)

      case _: SqlSumEmptyIsZeroAggFunction => createSum0AggFunction(argTypes)

      case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MIN =>
        createMinAggFunction(argTypes, index)

      case a: SqlMinMaxAggFunction if a.getKind == SqlKind.MAX =>
        createMaxAggFunction(argTypes, index)

      case _: SqlCountAggFunction if call.getArgList.size() > 1 =>
        throw new TableException("We now only support the count of one field.")

      // TODO supports CountDistinctAggFunction
      case _: SqlCountAggFunction if call.isDistinct && call.isApproximate =>
        createApproxCountDistinctAggFunction(argTypes, index)

      case _: SqlCountAggFunction if call.getArgList.isEmpty => createCount1AggFunction(argTypes)

      case _: SqlCountAggFunction => createCountAggFunction(argTypes)

      case a: SqlRankFunction if a.getKind == SqlKind.ROW_NUMBER =>
        createRowNumberAggFunction(argTypes)

      case a: SqlRankFunction if a.getKind == SqlKind.RANK =>
        createRankAggFunction(argTypes)

      case a: SqlRankFunction if a.getKind == SqlKind.DENSE_RANK =>
        createDenseRankAggFunction(argTypes)

      case a: SqlRankFunction if a.getKind == SqlKind.CUME_DIST =>
        if (isBounded) {
          createCumeDistAggFunction(argTypes)
        } else {
          throw new TableException("CUME_DIST Function is not supported in stream mode.")
        }

      case a: SqlRankFunction if a.getKind == SqlKind.PERCENT_RANK =>
        if (isBounded) {
          createPercentRankAggFunction(argTypes)
        } else {
          throw new TableException("PERCENT_RANK Function is not supported in stream mode.")
        }

      case _: SqlNtileAggFunction =>
        if (isBounded) {
          createNTILEAggFUnction(argTypes)
        } else {
          throw new TableException("NTILE Function is not supported in stream mode.")
        }

      case func: SqlLeadLagAggFunction =>
        if (isBounded) {
          createBatchLeadLagAggFunction(argTypes, index)
        } else {
          createStreamLeadLagAggFunction(func, argTypes, index)
        }

      case _: SqlSingleValueAggFunction =>
        createSingleValueAggFunction(argTypes)

      case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.FIRST_VALUE =>
        createFirstValueAggFunction(argTypes, index)

      case a: SqlFirstLastValueAggFunction if a.getKind == SqlKind.LAST_VALUE =>
        createLastValueAggFunction(argTypes, index)

      case _: SqlListAggFunction if call.getArgList.size() == 1 =>
        createListAggFunction(argTypes, index)

      case _: SqlListAggFunction if call.getArgList.size() == 2 =>
        createListAggWsFunction(argTypes, index)

      // TODO supports SqlCardinalityCountAggFunction

      case a: SqlAggFunction if a.getKind == SqlKind.COLLECT =>
        createCollectAggFunction(argTypes)

      case a: SqlAggFunction if a.getKind == SqlKind.ARRAY_AGG =>
        createArrayAggFunction(argTypes, call.ignoreNulls)

      case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_OBJECTAGG =>
        val onNull = fn.asInstanceOf[SqlJsonObjectAggAggFunction].getNullClause
        new JsonObjectAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL)

      case fn: SqlAggFunction if fn.getKind == SqlKind.JSON_ARRAYAGG =>
        val onNull = fn.asInstanceOf[SqlJsonArrayAggAggFunction].getNullClause
        new JsonArrayAggFunction(argTypes, onNull == SqlJsonConstructorNullClause.ABSENT_ON_NULL)

      case udagg: AggSqlFunction =>
        // Can not touch the literals, Calcite make them in previous RelNode.
        // In here, all inputs are input refs.
        val constants = new util.ArrayList[AnyRef]()
        argTypes.foreach(_ => constants.add(null))
        udagg.makeFunction(constants.toArray, argTypes)

      case bridge: BridgingSqlAggFunction =>
        bridge.getDefinition match {
          // built-in imperativeFunction
          case BuiltInFunctionDefinitions.PERCENTILE =>
            createPercentileAggFunction(argTypes)
          // DeclarativeAggregateFunction & UDF
          case _ =>
            bridge.getDefinition.asInstanceOf[UserDefinedFunction]
        }

      case unSupported: SqlAggFunction =>
        throw new TableException(s"Unsupported Function: '${unSupported.getName}'")
    }
  }