public void collectWindowOutputs()

in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java [603:820]


  public void collectWindowOutputs(
      final List<WindowOutput> outputs, final String timeSeries, final RowCollector collector)
      throws IOException {
    if (Objects.isNull(outputs) || outputs.isEmpty()) {
      return;
    }
    // Sort and same timestamps removal
    outputs.sort(Comparator.comparingLong(WindowOutput::getTimestamp));

    final AtomicLong lastValue = new AtomicLong(Long.MIN_VALUE);
    final List<WindowOutput> distinctOutputs = new ArrayList<>();
    outputs.forEach(
        output -> {
          final long timeStamp = output.getTimestamp();
          if (timeStamp != lastValue.get()) {
            lastValue.set(timeStamp);
            distinctOutputs.add(output);
          }
        });

    final MeasurementSchema[] measurementSchemaList =
        new MeasurementSchema[columnNameStringList.length];
    final TSDataType[] valueColumnTypes = new TSDataType[columnNameStringList.length];
    final Object[] valueColumns = new Object[columnNameStringList.length];
    final BitMap[] bitMaps = new BitMap[columnNameStringList.length];

    // Setup timestamps
    final long[] timestampColumn = new long[distinctOutputs.size()];
    for (int i = 0; i < distinctOutputs.size(); ++i) {
      timestampColumn[i] = distinctOutputs.get(i).getTimestamp();
    }

    for (int columnIndex = 0; columnIndex < columnNameStringList.length; ++columnIndex) {
      bitMaps[columnIndex] = new BitMap(distinctOutputs.size());
      for (int rowIndex = 0; rowIndex < distinctOutputs.size(); ++rowIndex) {
        final Map<String, Pair<TSDataType, Object>> aggregatedResults =
            distinctOutputs.get(rowIndex).getAggregatedResults();
        if (aggregatedResults.containsKey(columnNameStringList[columnIndex])) {
          if (Objects.isNull(valueColumnTypes[columnIndex])) {
            // Fill in measurements and init columns when the first non-null value is seen
            valueColumnTypes[columnIndex] =
                aggregatedResults.get(columnNameStringList[columnIndex]).getLeft();
            measurementSchemaList[columnIndex] =
                new MeasurementSchema(
                    columnNameStringList[columnIndex], valueColumnTypes[columnIndex]);
            switch (valueColumnTypes[columnIndex]) {
              case BOOLEAN:
                valueColumns[columnIndex] = new boolean[distinctOutputs.size()];
                break;
              case INT32:
                valueColumns[columnIndex] = new int[distinctOutputs.size()];
                break;
              case DATE:
                valueColumns[columnIndex] = new LocalDate[distinctOutputs.size()];
                break;
              case INT64:
              case TIMESTAMP:
                valueColumns[columnIndex] = new long[distinctOutputs.size()];
                break;
              case FLOAT:
                valueColumns[columnIndex] = new float[distinctOutputs.size()];
                break;
              case DOUBLE:
                valueColumns[columnIndex] = new double[distinctOutputs.size()];
                break;
              case TEXT:
              case BLOB:
              case STRING:
                valueColumns[columnIndex] = new Binary[distinctOutputs.size()];
                break;
              default:
                throw new UnsupportedOperationException(
                    String.format(
                        "The output tablet does not support column type %s",
                        valueColumnTypes[columnIndex]));
            }
          }
          // Fill in values
          switch (valueColumnTypes[columnIndex]) {
            case BOOLEAN:
              ((boolean[]) valueColumns[columnIndex])[rowIndex] =
                  (boolean) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            case INT32:
              ((int[]) valueColumns[columnIndex])[rowIndex] =
                  (int) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            case DATE:
              ((LocalDate[]) valueColumns[columnIndex])[rowIndex] =
                  (LocalDate) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            case INT64:
            case TIMESTAMP:
              ((long[]) valueColumns[columnIndex])[rowIndex] =
                  (long) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            case FLOAT:
              ((float[]) valueColumns[columnIndex])[rowIndex] =
                  (float) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            case DOUBLE:
              ((double[]) valueColumns[columnIndex])[rowIndex] =
                  (double) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            case TEXT:
            case STRING:
              ((Binary[]) valueColumns[columnIndex])[rowIndex] =
                  aggregatedResults.get(columnNameStringList[columnIndex]).getRight()
                          instanceof Binary
                      ? (Binary) aggregatedResults.get(columnNameStringList[columnIndex]).getRight()
                      : new Binary(
                          (String)
                              aggregatedResults.get(columnNameStringList[columnIndex]).getRight(),
                          TSFileConfig.STRING_CHARSET);
              break;
            case BLOB:
              ((Binary[]) valueColumns[columnIndex])[rowIndex] =
                  (Binary) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
              break;
            default:
              throw new UnsupportedOperationException(
                  String.format(
                      "The output tablet does not support column type %s",
                      valueColumnTypes[rowIndex]));
          }
        } else {
          bitMaps[columnIndex].mark(rowIndex);
        }
      }
    }

    // Filter null outputs
    final Integer[] originColumnIndex2FilteredColumnIndexMapperList =
        new Integer[columnNameStringList.length];
    int filteredCount = 0;
    for (int i = 0; i < columnNameStringList.length; ++i) {
      if (!bitMaps[i].isAllMarked()) {
        originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
      }
    }

    final String outputTimeSeries =
        outputDatabaseWithPathSeparator.isEmpty()
            ? timeSeries
            : outputDatabaseWithPathSeparator + timeSeries;

    if (filteredCount == columnNameStringList.length) {
      // No filter, collect rows
      for (int rowIndex = 0; rowIndex < distinctOutputs.size(); ++rowIndex) {
        collector.collectRow(
            rowIndex == 0
                ? new PipeResetTabletRow(
                    rowIndex,
                    outputTimeSeries,
                    false,
                    measurementSchemaList,
                    timestampColumn,
                    valueColumnTypes,
                    valueColumns,
                    bitMaps,
                    columnNameStringList)
                : new PipeRow(
                    rowIndex,
                    outputTimeSeries,
                    false,
                    measurementSchemaList,
                    timestampColumn,
                    valueColumnTypes,
                    valueColumns,
                    bitMaps,
                    columnNameStringList));
      }
    } else {
      // Recompute the column arrays
      final MeasurementSchema[] filteredMeasurementSchemaList =
          new MeasurementSchema[filteredCount];
      final String[] filteredColumnNameStringList = new String[filteredCount];
      final TSDataType[] filteredValueColumnTypes = new TSDataType[filteredCount];
      final Object[] filteredValueColumns = new Object[filteredCount];
      final BitMap[] filteredBitMaps = new BitMap[filteredCount];

      for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
        if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
          final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i];
          filteredMeasurementSchemaList[filteredColumnIndex] = measurementSchemaList[i];
          filteredColumnNameStringList[filteredColumnIndex] = columnNameStringList[i];
          filteredValueColumnTypes[filteredColumnIndex] = valueColumnTypes[i];
          filteredBitMaps[filteredColumnIndex] = bitMaps[i];
          filteredValueColumns[filteredColumnIndex] = valueColumns[i];
        }
      }
      // Collect rows
      for (int rowIndex = 0; rowIndex < distinctOutputs.size(); ++rowIndex) {
        collector.collectRow(
            rowIndex == 0
                ? new PipeResetTabletRow(
                    rowIndex,
                    outputTimeSeries,
                    false,
                    filteredMeasurementSchemaList,
                    timestampColumn,
                    filteredValueColumnTypes,
                    filteredValueColumns,
                    filteredBitMaps,
                    filteredColumnNameStringList)
                : new PipeRow(
                    rowIndex,
                    outputTimeSeries,
                    false,
                    filteredMeasurementSchemaList,
                    timestampColumn,
                    filteredValueColumnTypes,
                    filteredValueColumns,
                    filteredBitMaps,
                    filteredColumnNameStringList));
      }
    }
  }