in pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java [226:380]
private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce,
DataTableReducerContext reducerContext)
throws TimeoutException {
long start = System.currentTimeMillis();
assert !dataTablesToReduce.isEmpty();
ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
int numDataTables = dataTables.size();
// Get the number of threads to use for reducing.
int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
// Create an indexed table to perform the reduce.
IndexedTable indexedTable =
GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0), _queryContext, reducerContext,
numReduceThreadsToUse, reducerContext.getExecutorService());
// Create groups of data tables that each thread can process concurrently.
// Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse);
for (int i = 0; i < numReduceThreadsToUse; i++) {
reduceGroups.add(new ArrayList<>());
}
for (int i = 0; i < numDataTables; i++) {
reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
}
Future[] futures = new Future[numReduceThreadsToUse];
CountDownLatch countDownLatch = new CountDownLatch(numReduceThreadsToUse);
AtomicReference<Throwable> exception = new AtomicReference<>();
ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
for (int i = 0; i < numReduceThreadsToUse; i++) {
List<DataTable> reduceGroup = reduceGroups.get(i);
int taskId = i;
ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext();
futures[i] = reducerContext.getExecutorService().submit(new TraceRunnable() {
@Override
public void runJob() {
Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
try {
for (DataTable dataTable : reduceGroup) {
boolean nullHandlingEnabled = _queryContext.isNullHandlingEnabled();
RoaringBitmap[] nullBitmaps = null;
if (nullHandlingEnabled) {
nullBitmaps = new RoaringBitmap[_numColumns];
for (int i = 0; i < _numColumns; i++) {
nullBitmaps[i] = dataTable.getNullRowIds(i);
}
}
int numRows = dataTable.getNumberOfRows();
for (int rowId = 0; rowId < numRows; rowId++) {
// Terminate when thread is interrupted.
// This is expected when the query already fails in the main thread.
// The first check will always be performed when rowId = 0
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
Object[] values = new Object[_numColumns];
for (int colId = 0; colId < _numColumns; colId++) {
// NOTE: We need to handle data types for group key, intermediate and final aggregate result.
switch (storedColumnDataTypes[colId]) {
case INT:
values[colId] = dataTable.getInt(rowId, colId);
break;
case LONG:
values[colId] = dataTable.getLong(rowId, colId);
break;
case FLOAT:
values[colId] = dataTable.getFloat(rowId, colId);
break;
case DOUBLE:
values[colId] = dataTable.getDouble(rowId, colId);
break;
case BIG_DECIMAL:
values[colId] = dataTable.getBigDecimal(rowId, colId);
break;
case STRING:
values[colId] = dataTable.getString(rowId, colId);
break;
case BYTES:
values[colId] = dataTable.getBytes(rowId, colId);
break;
case INT_ARRAY:
values[colId] = IntArrayList.wrap(dataTable.getIntArray(rowId, colId));
break;
case LONG_ARRAY:
values[colId] = LongArrayList.wrap(dataTable.getLongArray(rowId, colId));
break;
case FLOAT_ARRAY:
values[colId] = FloatArrayList.wrap(dataTable.getFloatArray(rowId, colId));
break;
case DOUBLE_ARRAY:
values[colId] = DoubleArrayList.wrap(dataTable.getDoubleArray(rowId, colId));
break;
case STRING_ARRAY:
values[colId] = ObjectArrayList.wrap(dataTable.getStringArray(rowId, colId));
break;
case OBJECT:
CustomObject customObject = dataTable.getCustomObject(rowId, colId);
if (customObject != null) {
assert _aggregationFunctions != null;
values[colId] =
_aggregationFunctions[colId - _numGroupByExpressions].deserializeIntermediateResult(
customObject);
}
break;
// Add other aggregation intermediate result / group-by column type supports here
default:
throw new IllegalStateException();
}
}
if (nullHandlingEnabled) {
for (int colId = 0; colId < _numColumns; colId++) {
if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
values[colId] = null;
}
}
}
indexedTable.upsert(new Record(values));
}
}
} catch (Throwable t) {
exception.compareAndSet(null, t);
} finally {
countDownLatch.countDown();
Tracing.ThreadAccountantOps.clear();
}
}
});
}
try {
long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out in broker reduce phase");
}
Throwable t = exception.get();
if (t != null) {
Utils.rethrowException(t);
}
} catch (InterruptedException e) {
Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus();
throw new EarlyTerminationException(
"Interrupted in broker reduce phase" + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg),
e);
} finally {
for (Future future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
}
indexedTable.finish(true, true);
return indexedTable;
}