protected List gapFillAndAggregate()

in pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SumAvgGapfillProcessor.java [97:217]


  protected List<Object[]> gapFillAndAggregate(
      List<Object[]> rows, DataSchema dataSchema, DataSchema resultTableSchema) {
    int [] timeBucketedRawRows = new int[_numOfTimeBuckets + 1];
    int timeBucketedRawRowsIndex = 0;
    int lastIndex = rows.size();
    for (int i = 0; i < rows.size(); i++) {
      Object[] row = rows.get(i);
      long time = _dateTimeFormatter.fromFormatToMillis(String.valueOf(row[_timeBucketColumnIndex]));
      int index = findGapfillBucketIndex(time);
      if (index >= _numOfTimeBuckets) {
        lastIndex = index;
        timeBucketedRawRows[timeBucketedRawRowsIndex++] = i;
        break;
      }
      Key key = constructGroupKeys(row);
      _groupByKeys.putIfAbsent(key, _groupByKeys.size());
      if (index < 0) {
        // the data can potentially be used for previous value
        final int currentRowIndex = i;
        _previousIndexByGroupKey.compute(key, (k, previousRowIndex) -> {
          if (previousRowIndex == null) {
            return currentRowIndex;
          } else {
            if ((Long) row[_timeBucketColumnIndex] > (Long) rows.get(previousRowIndex)[_timeBucketColumnIndex]) {
              return currentRowIndex;
            } else {
              return previousRowIndex;
            }
          }
        });
      } else if (index >= timeBucketedRawRowsIndex) {
        while (index >= timeBucketedRawRowsIndex) {
          timeBucketedRawRows[timeBucketedRawRowsIndex++] = i;
        }
      }
    }
    while (timeBucketedRawRowsIndex < _numOfTimeBuckets + 1) {
      timeBucketedRawRows[timeBucketedRawRowsIndex++] = lastIndex;
    }

    _filteredMap = new HashMap<>();

    if (_queryContext.getSubquery() != null && _queryContext.getFilter() != null) {
      _postGapfillFilterHandler = new GapfillFilterHandler(_queryContext.getFilter(), dataSchema);
    }

    if (_queryContext.getHavingFilter() != null) {
      _postAggregateHavingFilterHandler =
          new GapfillFilterHandler(_queryContext.getHavingFilter(), resultTableSchema);
    }

    initializeAggregationValues(rows, dataSchema);

    List<Object[]> result = new ArrayList<>();

    double [] aggregatedSum = new double[_columnTypes.length];
    long aggregatedCount = 0;
    for (long time = _startMs; time < _endMs; time += _gapfillTimeBucketSize) {
      int timeBucketIndex = findGapfillBucketIndex(time);
      for (int i = timeBucketedRawRows[timeBucketIndex]; i < timeBucketedRawRows[timeBucketIndex + 1]; i++) {
        Object[] resultRow = rows.get(i);
        Key key = constructGroupKeys(resultRow);
        int groupKeyIndex = _groupByKeys.get(key);
        if ((_filteredMap.containsKey(groupKeyIndex))) {
          for (int j = 0; j < _columnTypes.length; j++) {
            if (_columnTypes[j] == 0) {
              continue;
            }
            _sumes[j] -= ((Number) (rows.get(_filteredMap.get(groupKeyIndex))[_sumArgIndexes[j]])).doubleValue();
          }
          _filteredMap.remove(groupKeyIndex);
          _count--;
        }
        if (_postGapfillFilterHandler == null || _postGapfillFilterHandler.isMatch(resultRow)) {
          _count++;
          for (int j = 0; j < _columnTypes.length; j++) {
            if (_columnTypes[j] == 0) {
              continue;
            }
            _sumes[j] += ((Number) (resultRow[_sumArgIndexes[j]])).doubleValue();
          }
          _filteredMap.put(groupKeyIndex, i);
        }
      }
      if (_count > 0) {
        aggregatedCount += _count;
        for (int i = 0; i < _columnTypes.length; i++) {
          if (_columnTypes[i] != 0) {
            aggregatedSum[i] += _sumes[i];
          }
        }
      }
      if ((timeBucketIndex + 1) % _aggregationSize == 0 && aggregatedCount > 0) {
        Object[] aggregatedRow = new Object[_queryContext.getSelectExpressions().size()];
        for (int i = 0; i < _columnTypes.length; i++) {
          if (_columnTypes[i] == 0) {
            if (dataSchema.getColumnDataType(_timeBucketColumnIndex) == DataSchema.ColumnDataType.LONG) {
              aggregatedRow[i] = time - (_aggregationSize - 1) * _gapfillTimeBucketSize;
            } else {
              aggregatedRow[i] = _dateTimeFormatter.fromMillisToFormat(
                  time - (_aggregationSize - 1) * _gapfillTimeBucketSize);
            }
          } else if (_columnTypes[i] == COLUMN_TYPE_SUM) {
            aggregatedRow[i] = aggregatedSum[i];
          } else { //COLUMN_TYPE_AVG
            aggregatedRow[i] = aggregatedSum[i] / aggregatedCount;
          }
        }
        aggregatedSum = new double[_columnTypes.length];
        aggregatedCount = 0;

        if (_postAggregateHavingFilterHandler == null || _postAggregateHavingFilterHandler.isMatch(aggregatedRow)) {
          result.add(aggregatedRow);
        }
        if (result.size() >= _limitForAggregatedResult) {
          return result;
        }
      }
    }
    return result;
  }