in pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SumAvgGapfillProcessor.java [97:217]
protected List<Object[]> gapFillAndAggregate(
List<Object[]> rows, DataSchema dataSchema, DataSchema resultTableSchema) {
int [] timeBucketedRawRows = new int[_numOfTimeBuckets + 1];
int timeBucketedRawRowsIndex = 0;
int lastIndex = rows.size();
for (int i = 0; i < rows.size(); i++) {
Object[] row = rows.get(i);
long time = _dateTimeFormatter.fromFormatToMillis(String.valueOf(row[_timeBucketColumnIndex]));
int index = findGapfillBucketIndex(time);
if (index >= _numOfTimeBuckets) {
lastIndex = index;
timeBucketedRawRows[timeBucketedRawRowsIndex++] = i;
break;
}
Key key = constructGroupKeys(row);
_groupByKeys.putIfAbsent(key, _groupByKeys.size());
if (index < 0) {
// the data can potentially be used for previous value
final int currentRowIndex = i;
_previousIndexByGroupKey.compute(key, (k, previousRowIndex) -> {
if (previousRowIndex == null) {
return currentRowIndex;
} else {
if ((Long) row[_timeBucketColumnIndex] > (Long) rows.get(previousRowIndex)[_timeBucketColumnIndex]) {
return currentRowIndex;
} else {
return previousRowIndex;
}
}
});
} else if (index >= timeBucketedRawRowsIndex) {
while (index >= timeBucketedRawRowsIndex) {
timeBucketedRawRows[timeBucketedRawRowsIndex++] = i;
}
}
}
while (timeBucketedRawRowsIndex < _numOfTimeBuckets + 1) {
timeBucketedRawRows[timeBucketedRawRowsIndex++] = lastIndex;
}
_filteredMap = new HashMap<>();
if (_queryContext.getSubquery() != null && _queryContext.getFilter() != null) {
_postGapfillFilterHandler = new GapfillFilterHandler(_queryContext.getFilter(), dataSchema);
}
if (_queryContext.getHavingFilter() != null) {
_postAggregateHavingFilterHandler =
new GapfillFilterHandler(_queryContext.getHavingFilter(), resultTableSchema);
}
initializeAggregationValues(rows, dataSchema);
List<Object[]> result = new ArrayList<>();
double [] aggregatedSum = new double[_columnTypes.length];
long aggregatedCount = 0;
for (long time = _startMs; time < _endMs; time += _gapfillTimeBucketSize) {
int timeBucketIndex = findGapfillBucketIndex(time);
for (int i = timeBucketedRawRows[timeBucketIndex]; i < timeBucketedRawRows[timeBucketIndex + 1]; i++) {
Object[] resultRow = rows.get(i);
Key key = constructGroupKeys(resultRow);
int groupKeyIndex = _groupByKeys.get(key);
if ((_filteredMap.containsKey(groupKeyIndex))) {
for (int j = 0; j < _columnTypes.length; j++) {
if (_columnTypes[j] == 0) {
continue;
}
_sumes[j] -= ((Number) (rows.get(_filteredMap.get(groupKeyIndex))[_sumArgIndexes[j]])).doubleValue();
}
_filteredMap.remove(groupKeyIndex);
_count--;
}
if (_postGapfillFilterHandler == null || _postGapfillFilterHandler.isMatch(resultRow)) {
_count++;
for (int j = 0; j < _columnTypes.length; j++) {
if (_columnTypes[j] == 0) {
continue;
}
_sumes[j] += ((Number) (resultRow[_sumArgIndexes[j]])).doubleValue();
}
_filteredMap.put(groupKeyIndex, i);
}
}
if (_count > 0) {
aggregatedCount += _count;
for (int i = 0; i < _columnTypes.length; i++) {
if (_columnTypes[i] != 0) {
aggregatedSum[i] += _sumes[i];
}
}
}
if ((timeBucketIndex + 1) % _aggregationSize == 0 && aggregatedCount > 0) {
Object[] aggregatedRow = new Object[_queryContext.getSelectExpressions().size()];
for (int i = 0; i < _columnTypes.length; i++) {
if (_columnTypes[i] == 0) {
if (dataSchema.getColumnDataType(_timeBucketColumnIndex) == DataSchema.ColumnDataType.LONG) {
aggregatedRow[i] = time - (_aggregationSize - 1) * _gapfillTimeBucketSize;
} else {
aggregatedRow[i] = _dateTimeFormatter.fromMillisToFormat(
time - (_aggregationSize - 1) * _gapfillTimeBucketSize);
}
} else if (_columnTypes[i] == COLUMN_TYPE_SUM) {
aggregatedRow[i] = aggregatedSum[i];
} else { //COLUMN_TYPE_AVG
aggregatedRow[i] = aggregatedSum[i] / aggregatedCount;
}
}
aggregatedSum = new double[_columnTypes.length];
aggregatedCount = 0;
if (_postAggregateHavingFilterHandler == null || _postAggregateHavingFilterHandler.isMatch(aggregatedRow)) {
result.add(aggregatedRow);
}
if (result.size() >= _limitForAggregatedResult) {
return result;
}
}
}
return result;
}