in spark-load/spark-load-dpp/src/main/java/org/apache/doris/load/loadv2/dpp/SparkRDDAggregator.java [61:153]
public static SparkRDDAggregator buildAggregator(EtlJobConfig.EtlColumn column) throws SparkDppException {
String aggType = StringUtils.lowerCase(column.aggregationType);
String columnType = StringUtils.lowerCase(column.columnType);
switch (aggType) {
case "bitmap_union":
return new BitmapUnionAggregator();
case "hll_union":
return new HllUnionAggregator();
case "max":
switch (columnType) {
case "tinyint":
case "smallint":
case "int":
case "bigint":
case "float":
case "double":
case "decimalv2":
case "decimal32":
case "decimal64":
case "decimal128":
case "date":
case "datetime":
case "datev2":
case "datetimev2":
return new NumberMaxAggregator();
case "char":
case "varchar":
return new StringMaxAggregator();
case "largeint":
return new LargeIntMaxAggregator();
default:
throw new SparkDppException(
String.format("unsupported max aggregator for column type:%s", columnType));
}
case "min":
switch (columnType) {
case "tinyint":
case "smallint":
case "int":
case "bigint":
case "float":
case "double":
case "decimalv2":
case "decimal32":
case "decimal64":
case "decimal128":
case "date":
case "datetime":
case "datev2":
case "datetimev2":
return new NumberMinAggregator();
case "char":
case "varchar":
return new StringMinAggregator();
case "largeint":
return new LargeIntMinAggregator();
default:
throw new SparkDppException(
String.format("unsupported min aggregator for column type:%s", columnType));
}
case "sum":
switch (columnType) {
case "tinyint":
return new ByteSumAggregator();
case "smallint":
return new ShortSumAggregator();
case "int":
return new IntSumAggregator();
case "bigint":
return new LongSumAggregator();
case "float":
return new FloatSumAggregator();
case "double":
return new DoubleSumAggregator();
case "largeint":
return new LargeIntSumAggregator();
case "decimalv2":
case "decimal32":
case "decimal64":
case "decimal128":
return new BigDecimalSumAggregator();
default:
throw new SparkDppException(
String.format("unsupported sum aggregator for column type:%s", columnType));
}
case "replace_if_not_null":
return new ReplaceIfNotNullAggregator();
case "replace":
return new ReplaceAggregator();
default:
throw new SparkDppException(String.format("unsupported aggregate type %s", aggType));
}
}