def buildAgg()

in src/spark-project/sparder/src/main/scala/org/apache/kylin/query/runtime/plan/AggregatePlan.scala [153:358]


  def buildAgg(schema: Seq[Attribute],
               rel: OlapAggregateRel,
               plan: LogicalPlan): List[Column] = {
    val hash = System.identityHashCode(rel).toString

    rel.getRewriteAggCalls.asScala.zipWithIndex.map {
      case (call: KylinAggregateCall, index: Int)
        if binaryMeasureType.contains(OlapAggregateRel.getAggrFuncName(call)) =>
        val dataType = call.getFunc.getReturnDataType
        val isCount = call.getFunc.isCount
        val funcName =
          if (isCount) FunctionDesc.FUNC_COUNT else OlapAggregateRel.getAggrFuncName(call)
        val argNames = call.getArgList.asScala.map(schema.apply(_).name)
        val columnName = argNames.map(name => col(name))
        val registeredFuncName = RuntimeHelper.registerSingleByColName(funcName, dataType)
        val aggName = SchemaProcessor.replaceToAggravateSchemaName(index, funcName, hash, argNames: _*)
        if (funcName == FunctionDesc.FUNC_COUNT_DISTINCT) {
          if (dataType.getName == "hllc") {
            org.apache.spark.sql.KapFunctions
              .approx_count_distinct(columnName.head, dataType.getPrecision)
              .alias(aggName)
          } else {
            KapFunctions.precise_count_distinct(columnName.head).alias(aggName)
          }
        } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_BITMAP_UUID)) {
          KapFunctions.precise_bitmap_uuid(columnName.head).alias(aggName)
        } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_BITMAP_BUILD)) {
          KapFunctions.precise_bitmap_build(columnName.head).alias(aggName)
        } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_INTERSECT_COUNT)) {
          require(columnName.size >= 3, s"Input columns size ${columnName.size} don't greater than or equal to 3.")
          val resolvedPlan = SparkInternalAgent.getDataFrame(SparderEnv.getSparkSession, plan)
          val columns = columnName.slice(0, 3).zipWithIndex.map {
            case (column: Column, 2) => column.cast(ArrayType.apply(resolvedPlan.schema.apply(call.getArgList.get(1)).dataType))
            case (column: Column, _) => column
          }
          val separator = s"\\${KylinConfig.getInstanceFromEnv.getIntersectFilterOrSeparator}"
          val upperBound = KylinConfig.getInstanceFromEnv.getBitmapValuesUpperBound
          call.name.toUpperCase(Locale.ROOT) match {
            case FunctionDesc.FUNC_INTERSECT_COUNT => KapFunctions.intersect_count(separator, upperBound, columns.toList: _*).alias(aggName)
            case FunctionDesc.FUNC_INTERSECT_VALUE => KapFunctions.intersect_value(separator, upperBound, columns.toList: _*).alias(aggName)
            case FunctionDesc.FUNC_INTERSECT_BITMAP_UUID => KapFunctions.intersect_bitmap(separator, upperBound, columns.toList: _*).alias(aggName)
            case FunctionDesc.FUNC_INTERSECT_COUNT_V2 => KapFunctions.intersect_count_v2(columnName.last, separator, upperBound, columns.toList: _*).alias(aggName)
            case FunctionDesc.FUNC_INTERSECT_VALUE_V2 => KapFunctions.intersect_value_v2(columnName.last, separator, upperBound, columns.toList: _*).alias(aggName)
            case FunctionDesc.FUNC_INTERSECT_BITMAP_UUID_V2 => KapFunctions.intersect_bitmap_v2(columnName.last, separator, upperBound, columns.toList: _*).alias(aggName)
            case func => throw new UnsupportedOperationException(s"Unsupported intersect count function: $func, please check the sql.")
          }
        } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_PERCENTILE)) {
          require(columnName.size == 2, s"Input columns size ${columnName.size} don't equal to 2.")
          KapFunctions.k_percentile(columnName.head, columnName(1), dataType.getPrecision).alias(aggName)
        } else if (funcName.equalsIgnoreCase(FunctionDesc.FUNC_SUM_LC)) {
          KapFunctions.k_sum_lc(columnName.head, SparderTypeUtil.toSparkType(dataType)).alias(aggName)
        } else {
          callUDF(registeredFuncName, columnName.toList: _*).alias(aggName)
        }
      case (call: Any, index: Int) =>
        val funcName = OlapAggregateRel.getAggrFuncName(call)
        val argNames = call.getArgList.asScala.map(id => schema.apply(id).name)
        val columnName = argNames.map(name => col(name))
        val inputType = call.getType
        val aggName = SchemaProcessor.replaceToAggravateSchemaName(index,
          funcName,
          hash,
          argNames: _*)
        funcName match {
          case FunctionDesc.FUNC_PERCENTILE =>
            rel.getInput match {
              case projectRel: OlapProjectRel =>
                val percentageArg = projectRel.getProjects.get(call.getArgList.get(1))
                val accuracyArg = if (call.getArgList.size() < 3) {
                  None
                } else {
                  Some(projectRel.getProjects.get(call.getArgList.get(2)))
                }
                (percentageArg, accuracyArg) match {
                  case (percentageLitRex: RexLiteral, accuracyArgLitRex: Option[RexLiteral]) =>
                    if (KylinConfig.getInstanceFromEnv.getPercentileApproxAlgorithm.equalsIgnoreCase("t-digest")) {
                      KapFunctions.k_percentile(columnName.head, columnName(1), PercentileCounter.DEFAULT_PERCENTILE_ACCURACY).alias(aggName)
                    } else {
                      val percentage = percentageLitRex.getValue
                      val accuracy = accuracyArgLitRex.map(arg => arg.getValue).getOrElse(ApproximatePercentile.DEFAULT_PERCENTILE_ACCURACY)
                      percentile_approx(col(argNames.head), lit(percentage), lit(accuracy)).alias(aggName)
                    }
                }
              case _ =>
                throw new UnsupportedOperationException(s"Invalid percentile_approx parameters, " +
                  s"expecting approx_percentile(col, percentage [, accuracy]), percentage/accuracy must be of constant literal")
            }
          case FunctionDesc.FUNC_SUM =>
            // TODO Use improved sum decimal precision by default in future
            if (KylinRelDataTypeSystem.getProjectConfig.isImprovedSumDecimalPrecisionEnabled) {
              if (isSum0(call)) {
                sum0(col(argNames.head))
                  .cast(SparderTypeUtil.convertSqlTypeToSparkType(inputType))
                  .alias(aggName)
              } else {
                sum(col(argNames.head))
                  .cast(SparderTypeUtil.convertSqlTypeToSparkType(inputType))
                  .alias(aggName)
              }
            } else {
              if (isSum0(call)) {
                sum0(
                  col(argNames.head).cast(
                    SparderTypeUtil.convertSqlTypeToSparkType(inputType)))
                  .alias(aggName)
              } else {
                sum(
                  col(argNames.head).cast(
                    SparderTypeUtil.convertSqlTypeToSparkType(inputType)))
                  .alias(aggName)
              }
            }
          case FunctionDesc.FUNC_COUNT =>
            count(if (argNames.isEmpty) k_lit(1) else col(argNames.head))
              .alias(aggName)
          case FunctionDesc.FUNC_MAX =>
            max(
              col(argNames.head).cast(
                SparderTypeUtil.convertSqlTypeToSparkType(inputType)))
              .alias(aggName)
          case FunctionDesc.FUNC_MIN =>
            min(
              col(argNames.head).cast(
                SparderTypeUtil.convertSqlTypeToSparkType(inputType)))
              .alias(aggName)
          case FunctionDesc.FUNC_COUNT_DISTINCT if call.getAggregation.getName == "BITMAP_COUNT" =>
            KapFunctions.precise_count_distinct(col(argNames.head)).alias(aggName)
          case FunctionDesc.FUNC_COUNT_DISTINCT =>
            countDistinct(argNames.head, argNames.drop(1): _*)
              .alias(aggName)
          case FunctionDesc.FUNC_BITMAP_BUILD =>
            KapFunctions.precise_bitmap_build_pushdown(columnName.head).alias(aggName)
          // Issue 4337: Supported select (select '2012-01-02') as data, xxx from table group by xxx
          case SqlKind.SINGLE_VALUE.sql =>
            val structField = StructField(schema.head.name, SparderTypeUtil.convertSqlTypeToSparkType(inputType), true, Metadata.empty)
            SingleValueAgg(structField).apply(col(argNames.head)).alias(aggName)
          case FunctionDesc.FUNC_GROUPING =>
            if (!rel.isSimpleGroupType) {
              grouping(argNames.head).alias(aggName)
            } else {
              if (rel.getRewriteGroupKeys.contains(call.getArgList.get(0))) {
                k_lit(0).alias(aggName)
              } else {
                k_lit(1).alias(aggName)
              }
            }
          case FunctionDesc.FUNC_COLLECT_SET =>
            call match {
              case kac: KylinAggregateCall =>
                array_distinct(flatten(collect_set(col(argNames.head))))
                  .alias(aggName)
              case _ =>
                collect_set(col(argNames.head)).alias(aggName)
            }
          case FunctionDesc.FUNC_INTERSECT_BITMAP_UUID_DISTINCT =>
            KapFunctions.bitmap_uuid_func(col(argNames.head), BinaryType, BitmapFuncType.INTERSECT).alias(aggName)
          case FunctionDesc.FUNC_INTERSECT_BITMAP_UUID_COUNT =>
            KapFunctions.bitmap_uuid_func(col(argNames.head), IntegerType, BitmapFuncType.INTERSECT).alias(aggName)
          case FunctionDesc.FUNC_INTERSECT_BITMAP_UUID_VALUE =>
            rel.getInput match {
              case projectRel: OlapProjectRel =>
                val limitArg = projectRel.getProjects.get(call.getArgList.get(1))
                val offsetArg = projectRel.getProjects.get(call.getArgList.get(2))
                (limitArg, offsetArg) match {
                  case (limitLit: RexLiteral, offsetLit: RexLiteral) =>
                    val limit = limitLit.getValue.toString.toInt
                    val offset = offsetLit.getValue.toString.toInt
                    KapFunctions.bitmap_uuid_page_func(col(argNames.head), limit, offset, ArrayType(LongType, false),
                      BitmapFuncType.INTERSECT).alias(aggName)
                  case _ =>
                    throw new UnsupportedOperationException()
                }
              case _ =>
                throw new UnsupportedOperationException(unSupportedOperationEp)
            }
          case FunctionDesc.FUNC_INTERSECT_BITMAP_UUID_VALUE_ALL =>
            KapFunctions.bitmap_uuid_func(col(argNames.head), ArrayType(LongType, false), BitmapFuncType.INTERSECT).alias(aggName)
          case FunctionDesc.FUNC_UNION_BITMAP_UUID_DISTINCT =>
            KapFunctions.bitmap_uuid_func(col(argNames.head), BinaryType, BitmapFuncType.UNION).alias(aggName)
          case FunctionDesc.FUNC_UNION_BITMAP_UUID_COUNT =>
            KapFunctions.bitmap_uuid_func(col(argNames.head), IntegerType, BitmapFuncType.UNION).alias(aggName)
          case FunctionDesc.FUNC_UNION_BITMAP_UUID_VALUE =>
            rel.getInput match {
              case projectRel: OlapProjectRel =>
                val limitArg = projectRel.getProjects.get(call.getArgList.get(1))
                val offsetArg = projectRel.getProjects.get(call.getArgList.get(2))
                (limitArg, offsetArg) match {
                  case (limitLit: RexLiteral, offsetLit: RexLiteral) =>
                    val limit = limitLit.getValue.toString.toInt
                    val offset = offsetLit.getValue.toString.toInt
                    KapFunctions.bitmap_uuid_page_func(col(argNames.head), limit, offset, ArrayType(LongType, false),
                      BitmapFuncType.UNION).alias(aggName)
                  case _ =>
                    throw new UnsupportedOperationException(unSupportedOperationEp)
                }
              case _ =>
                throw new UnsupportedOperationException(unSupportedOperationEp)
            }
          case FunctionDesc.FUNC_UNION_BITMAP_UUID_VALUE_ALL =>
            KapFunctions.bitmap_uuid_func(col(argNames.head), ArrayType(LongType, false), BitmapFuncType.UNION).alias(aggName)
          case _ =>
            throw new IllegalArgumentException(
              s"""Unsupported function name $funcName""")
        }
    }.toList
  }