private IndexedTable getIndexedTable()

in pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java [226:380]


  private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce,
      DataTableReducerContext reducerContext)
      throws TimeoutException {
    long start = System.currentTimeMillis();

    assert !dataTablesToReduce.isEmpty();
    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
    int numDataTables = dataTables.size();

    // Get the number of threads to use for reducing.
    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());

    // Create an indexed table to perform the reduce.
    IndexedTable indexedTable =
        GroupByUtils.createIndexedTableForDataTableReducer(dataTables.get(0), _queryContext, reducerContext,
            numReduceThreadsToUse, reducerContext.getExecutorService());

    // Create groups of data tables that each thread can process concurrently.
    // Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
    List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse);
    for (int i = 0; i < numReduceThreadsToUse; i++) {
      reduceGroups.add(new ArrayList<>());
    }
    for (int i = 0; i < numDataTables; i++) {
      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
    }

    Future[] futures = new Future[numReduceThreadsToUse];
    CountDownLatch countDownLatch = new CountDownLatch(numReduceThreadsToUse);
    AtomicReference<Throwable> exception = new AtomicReference<>();
    ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
    for (int i = 0; i < numReduceThreadsToUse; i++) {
      List<DataTable> reduceGroup = reduceGroups.get(i);
      int taskId = i;
      ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext();
      futures[i] = reducerContext.getExecutorService().submit(new TraceRunnable() {
        @Override
        public void runJob() {
          Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
          try {
            for (DataTable dataTable : reduceGroup) {
              boolean nullHandlingEnabled = _queryContext.isNullHandlingEnabled();
              RoaringBitmap[] nullBitmaps = null;
              if (nullHandlingEnabled) {
                nullBitmaps = new RoaringBitmap[_numColumns];
                for (int i = 0; i < _numColumns; i++) {
                  nullBitmaps[i] = dataTable.getNullRowIds(i);
                }
              }

              int numRows = dataTable.getNumberOfRows();
              for (int rowId = 0; rowId < numRows; rowId++) {
                // Terminate when thread is interrupted.
                // This is expected when the query already fails in the main thread.
                // The first check will always be performed when rowId = 0
                Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rowId);
                Object[] values = new Object[_numColumns];
                for (int colId = 0; colId < _numColumns; colId++) {
                  // NOTE: We need to handle data types for group key, intermediate and final aggregate result.
                  switch (storedColumnDataTypes[colId]) {
                    case INT:
                      values[colId] = dataTable.getInt(rowId, colId);
                      break;
                    case LONG:
                      values[colId] = dataTable.getLong(rowId, colId);
                      break;
                    case FLOAT:
                      values[colId] = dataTable.getFloat(rowId, colId);
                      break;
                    case DOUBLE:
                      values[colId] = dataTable.getDouble(rowId, colId);
                      break;
                    case BIG_DECIMAL:
                      values[colId] = dataTable.getBigDecimal(rowId, colId);
                      break;
                    case STRING:
                      values[colId] = dataTable.getString(rowId, colId);
                      break;
                    case BYTES:
                      values[colId] = dataTable.getBytes(rowId, colId);
                      break;
                    case INT_ARRAY:
                      values[colId] = IntArrayList.wrap(dataTable.getIntArray(rowId, colId));
                      break;
                    case LONG_ARRAY:
                      values[colId] = LongArrayList.wrap(dataTable.getLongArray(rowId, colId));
                      break;
                    case FLOAT_ARRAY:
                      values[colId] = FloatArrayList.wrap(dataTable.getFloatArray(rowId, colId));
                      break;
                    case DOUBLE_ARRAY:
                      values[colId] = DoubleArrayList.wrap(dataTable.getDoubleArray(rowId, colId));
                      break;
                    case STRING_ARRAY:
                      values[colId] = ObjectArrayList.wrap(dataTable.getStringArray(rowId, colId));
                      break;
                    case OBJECT:
                      CustomObject customObject = dataTable.getCustomObject(rowId, colId);
                      if (customObject != null) {
                        assert _aggregationFunctions != null;
                        values[colId] =
                            _aggregationFunctions[colId - _numGroupByExpressions].deserializeIntermediateResult(
                                customObject);
                      }
                      break;
                    // Add other aggregation intermediate result / group-by column type supports here
                    default:
                      throw new IllegalStateException();
                  }
                }
                if (nullHandlingEnabled) {
                  for (int colId = 0; colId < _numColumns; colId++) {
                    if (nullBitmaps[colId] != null && nullBitmaps[colId].contains(rowId)) {
                      values[colId] = null;
                    }
                  }
                }
                indexedTable.upsert(new Record(values));
              }
            }
          } catch (Throwable t) {
            exception.compareAndSet(null, t);
          } finally {
            countDownLatch.countDown();
            Tracing.ThreadAccountantOps.clear();
          }
        }
      });
    }

    try {
      long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
      if (!countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS)) {
        throw new TimeoutException("Timed out in broker reduce phase");
      }
      Throwable t = exception.get();
      if (t != null) {
        Utils.rethrowException(t);
      }
    } catch (InterruptedException e) {
      Exception killedErrorMsg = Tracing.getThreadAccountant().getErrorStatus();
      throw new EarlyTerminationException(
          "Interrupted in broker reduce phase" + (killedErrorMsg == null ? StringUtils.EMPTY : " " + killedErrorMsg),
          e);
    } finally {
      for (Future future : futures) {
        if (!future.isDone()) {
          future.cancel(true);
        }
      }
    }

    indexedTable.finish(true, true);
    return indexedTable;
  }