in plugins/transforms/memgroupby/src/main/java/org/apache/hop/pipeline/transforms/memgroupby/MemoryGroupBy.java [237:393]
void addToAggregate(Object[] r) throws HopException {
Object[] groupData = new Object[data.groupMeta.size()];
for (int i = 0; i < data.groupnrs.length; i++) {
groupData[i] = r[data.groupnrs[i]];
}
HashEntry entry = data.getHashEntry(groupData);
Aggregate aggregate = data.map.get(entry);
if (aggregate == null) {
// Create a new value...
//
aggregate = new Aggregate();
newAggregate(r, aggregate);
// Store it in the map!
//
data.map.put(entry, aggregate);
}
for (int i = 0; i < data.subjectnrs.length; i++) {
Object subj = r[data.subjectnrs[i]];
IValueMeta subjMeta = data.inputRowMeta.getValueMeta(data.subjectnrs[i]);
Object value = aggregate.agg[i];
IValueMeta valueMeta = data.aggMeta.getValueMeta(i);
GAggregate agg = meta.getAggregates().get(i);
switch (agg.getType()) {
case Sum:
aggregate.agg[i] = ValueDataUtil.sum(valueMeta, value, subjMeta, subj);
break;
case Average:
if (!subjMeta.isNull(subj)) {
aggregate.agg[i] = ValueDataUtil.sum(valueMeta, value, subjMeta, subj);
aggregate.counts[i]++;
}
break;
case Median, Percentile:
if (!subjMeta.isNull(subj)) {
((List<Double>) aggregate.agg[i]).add(subjMeta.getNumber(subj));
}
break;
case StandardDeviation:
if (aggregate.mean == null) {
aggregate.mean = new double[meta.getAggregates().size()];
}
aggregate.counts[i]++;
double n = aggregate.counts[i];
double x = subjMeta.getNumber(subj);
// for standard deviation null is exact 0
double sum = value == null ? Double.valueOf(0) : (Double) value;
double mean = aggregate.mean[i];
double delta = x - mean;
mean = mean + (delta / n);
sum = sum + delta * (x - mean);
aggregate.mean[i] = mean;
aggregate.agg[i] = sum;
break;
case CountDistinct:
if (aggregate.distinctObjs == null) {
aggregate.distinctObjs = new Set[meta.getAggregates().size()];
}
if (aggregate.distinctObjs[i] == null) {
aggregate.distinctObjs[i] = new TreeSet<>();
}
if (!subjMeta.isNull(subj)) {
Object obj = subjMeta.convertToNormalStorageType(subj);
// byte [] is not Comparable and can not be added to TreeSet.
// For our case it can be binary array. It was typed as String.
// So it can be processing (comparing and displaying) correctly as String
if (obj instanceof byte[] bytes) {
obj = new String(bytes);
}
if (!aggregate.distinctObjs[i].contains(obj)) {
aggregate.distinctObjs[i].add(obj);
}
}
aggregate.counts[i] = aggregate.distinctObjs[i].size();
break;
case CountAll:
if (!subjMeta.isNull(subj)) {
aggregate.counts[i]++;
}
break;
case CountAny:
aggregate.counts[i]++;
break;
case Minimum:
boolean subjIsNull = subjMeta.isNull(subj);
boolean valueIsNull = valueMeta.isNull(value);
if (minNullIsValued || (!subjIsNull && !valueIsNull)) {
// do not compare null
aggregate.agg[i] = subjMeta.compare(subj, valueMeta, value) < 0 ? subj : value;
} else if (valueIsNull && !subjIsNull) {
// By default set aggregate to first not null value
aggregate.agg[i] = subj;
}
break;
case Maximum:
if (subjMeta.compare(subj, valueMeta, value) > 0) {
aggregate.agg[i] = subj;
}
break;
case First:
if (!subjMeta.isNull(subj) && value == null) {
aggregate.agg[i] = subj;
}
break;
case Last:
if (!subjMeta.isNull(subj)) {
aggregate.agg[i] = subj;
}
break;
case FirstIncludingNull:
if (aggregate.counts[i] == 0) {
aggregate.agg[i] = subj;
aggregate.counts[i]++;
}
break;
case LastIncludingNull:
aggregate.agg[i] = subj;
break;
case ConcatComma:
if (subj != null) {
StringBuilder sb = (StringBuilder) value;
if (sb.length() > 0) {
sb.append(", ");
}
sb.append(subjMeta.getString(subj));
}
break;
case ConcatString:
if (subj != null) {
String separator = "";
if (!Utils.isEmpty(agg.getValueField())) {
separator = resolve(agg.getValueField());
}
StringBuilder sb = (StringBuilder) value;
if (sb.length() > 0) {
sb.append(separator);
}
sb.append(subjMeta.getString(subj));
}
break;
case ConcatDistinct:
if (subj != null) {
SortedSet<Object> set = (SortedSet<Object>) value;
set.add(subj);
}
break;
default:
break;
}
}
}