in processing/src/main/java/org/apache/druid/query/topn/PooledTopNAlgorithm.java [453:675]
private static long scanAndAggregateDefault(
final PooledTopNParams params,
final int[] positions,
final BufferAggregator[] theAggregators
)
{
if (params.getCardinality() < 0) {
throw new UnsupportedOperationException("Cannot operate on a dimension with unknown cardinality");
}
final ByteBuffer resultsBuf = params.getResultsBuf();
final int numBytesPerRecord = params.getNumBytesPerRecord();
final int[] aggregatorSizes = params.getAggregatorSizes();
final Cursor cursor = params.getCursor();
final CursorGranularizer granularizer = params.getGranularizer();
final DimensionSelector dimSelector = params.getDimSelector();
final int[] aggregatorOffsets = new int[aggregatorSizes.length];
for (int j = 0, offset = 0; j < aggregatorSizes.length; ++j) {
aggregatorOffsets[j] = offset;
offset += aggregatorSizes[j];
}
final int aggSize = theAggregators.length;
final int aggExtra = aggSize % AGG_UNROLL_COUNT;
int currentPosition = 0;
long processedRows = 0;
if (granularizer.currentOffsetWithinBucket()) {
while (!cursor.isDoneOrInterrupted()) {
final IndexedInts dimValues = dimSelector.getRow();
final int dimSize = dimValues.size();
final int dimExtra = dimSize % AGG_UNROLL_COUNT;
switch (dimExtra) {
case 7:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(6),
currentPosition
);
// fall through
case 6:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(5),
currentPosition
);
// fall through
case 5:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(4),
currentPosition
);
// fall through
case 4:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(3),
currentPosition
);
// fall through
case 3:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(2),
currentPosition
);
// fall through
case 2:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(1),
currentPosition
);
// fall through
case 1:
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(0),
currentPosition
);
}
for (int i = dimExtra; i < dimSize; i += AGG_UNROLL_COUNT) {
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 1),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 2),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 3),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 4),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 5),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 6),
currentPosition
);
currentPosition = aggregateDimValue(
positions,
theAggregators,
resultsBuf,
numBytesPerRecord,
aggregatorOffsets,
aggSize,
aggExtra,
dimValues.get(i + 7),
currentPosition
);
}
processedRows++;
if (!granularizer.advanceCursorWithinBucketUninterruptedly()) {
break;
}
}
}
return processedRows;
}