in wayang-api/wayang-api-sql/src/main/java/org/apache/wayang/api/sql/calcite/converter/functions/AggregateFunction.java [43:96]
public Record apply(final Record record1, final Record record2) {
final int l = record1.size();
final Object[] resValues = new Object[l];
boolean countDone = false;
for (int i = 0; i < l - aggregateKinds.size() - 1; i++) {
resValues[i] = record1.getField(i);
}
int counter = l - aggregateKinds.size() - 1;
for (final SqlKind kind : aggregateKinds) {
final Object field1 = record1.getField(counter);
final Object field2 = record2.getField(counter);
switch (kind) {
case SUM:
resValues[counter] = this.castAndMap(field1, field2, null, Long::sum, Integer::sum, Double::sum);
break;
case MIN:
resValues[counter] = this.castAndMap(field1, field2, SqlFunctions::least, SqlFunctions::least,
SqlFunctions::least, SqlFunctions::least);
break;
case MAX:
resValues[counter] = this.castAndMap(field1, field2, SqlFunctions::greatest, SqlFunctions::greatest,
SqlFunctions::greatest, SqlFunctions::greatest);
break;
case COUNT:
// since aggregates inject an extra column for counting before,
// see AggregateAddCols. the column we operate on are integer counts,
// which means we can eagerly get the fields as integers and simply sum
assert (field1 instanceof Integer && field2 instanceof Integer)
: "Expected to find integers for count but found: " + field1 + " and " + field2;
final Object count = Integer.class.cast(field1) + Integer.class.cast(field2);
resValues[counter] = count;
break;
case AVG:
assert (field1 instanceof Integer && field2 instanceof Integer)
: "Expected to find integers for count but found: " + field1 + " and " + field2;
final Object avg = Integer.class.cast(field1) + Integer.class.cast(field2);
resValues[counter] = avg;
if (!countDone) {
resValues[l - 1] = record1.getInt(l - 1) + record2.getInt(l - 1);
countDone = true;
}
break;
default:
throw new IllegalStateException("Unsupported operation: " + kind);
}
counter++;
}
return new Record(resValues);
}