in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/processor/aggregate/AggregateProcessor.java [603:820]
public void collectWindowOutputs(
final List<WindowOutput> outputs, final String timeSeries, final RowCollector collector)
throws IOException {
if (Objects.isNull(outputs) || outputs.isEmpty()) {
return;
}
// Sort and same timestamps removal
outputs.sort(Comparator.comparingLong(WindowOutput::getTimestamp));
final AtomicLong lastValue = new AtomicLong(Long.MIN_VALUE);
final List<WindowOutput> distinctOutputs = new ArrayList<>();
outputs.forEach(
output -> {
final long timeStamp = output.getTimestamp();
if (timeStamp != lastValue.get()) {
lastValue.set(timeStamp);
distinctOutputs.add(output);
}
});
final MeasurementSchema[] measurementSchemaList =
new MeasurementSchema[columnNameStringList.length];
final TSDataType[] valueColumnTypes = new TSDataType[columnNameStringList.length];
final Object[] valueColumns = new Object[columnNameStringList.length];
final BitMap[] bitMaps = new BitMap[columnNameStringList.length];
// Setup timestamps
final long[] timestampColumn = new long[distinctOutputs.size()];
for (int i = 0; i < distinctOutputs.size(); ++i) {
timestampColumn[i] = distinctOutputs.get(i).getTimestamp();
}
for (int columnIndex = 0; columnIndex < columnNameStringList.length; ++columnIndex) {
bitMaps[columnIndex] = new BitMap(distinctOutputs.size());
for (int rowIndex = 0; rowIndex < distinctOutputs.size(); ++rowIndex) {
final Map<String, Pair<TSDataType, Object>> aggregatedResults =
distinctOutputs.get(rowIndex).getAggregatedResults();
if (aggregatedResults.containsKey(columnNameStringList[columnIndex])) {
if (Objects.isNull(valueColumnTypes[columnIndex])) {
// Fill in measurements and init columns when the first non-null value is seen
valueColumnTypes[columnIndex] =
aggregatedResults.get(columnNameStringList[columnIndex]).getLeft();
measurementSchemaList[columnIndex] =
new MeasurementSchema(
columnNameStringList[columnIndex], valueColumnTypes[columnIndex]);
switch (valueColumnTypes[columnIndex]) {
case BOOLEAN:
valueColumns[columnIndex] = new boolean[distinctOutputs.size()];
break;
case INT32:
valueColumns[columnIndex] = new int[distinctOutputs.size()];
break;
case DATE:
valueColumns[columnIndex] = new LocalDate[distinctOutputs.size()];
break;
case INT64:
case TIMESTAMP:
valueColumns[columnIndex] = new long[distinctOutputs.size()];
break;
case FLOAT:
valueColumns[columnIndex] = new float[distinctOutputs.size()];
break;
case DOUBLE:
valueColumns[columnIndex] = new double[distinctOutputs.size()];
break;
case TEXT:
case BLOB:
case STRING:
valueColumns[columnIndex] = new Binary[distinctOutputs.size()];
break;
default:
throw new UnsupportedOperationException(
String.format(
"The output tablet does not support column type %s",
valueColumnTypes[columnIndex]));
}
}
// Fill in values
switch (valueColumnTypes[columnIndex]) {
case BOOLEAN:
((boolean[]) valueColumns[columnIndex])[rowIndex] =
(boolean) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
case INT32:
((int[]) valueColumns[columnIndex])[rowIndex] =
(int) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
case DATE:
((LocalDate[]) valueColumns[columnIndex])[rowIndex] =
(LocalDate) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
case INT64:
case TIMESTAMP:
((long[]) valueColumns[columnIndex])[rowIndex] =
(long) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
case FLOAT:
((float[]) valueColumns[columnIndex])[rowIndex] =
(float) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
case DOUBLE:
((double[]) valueColumns[columnIndex])[rowIndex] =
(double) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
case TEXT:
case STRING:
((Binary[]) valueColumns[columnIndex])[rowIndex] =
aggregatedResults.get(columnNameStringList[columnIndex]).getRight()
instanceof Binary
? (Binary) aggregatedResults.get(columnNameStringList[columnIndex]).getRight()
: new Binary(
(String)
aggregatedResults.get(columnNameStringList[columnIndex]).getRight(),
TSFileConfig.STRING_CHARSET);
break;
case BLOB:
((Binary[]) valueColumns[columnIndex])[rowIndex] =
(Binary) aggregatedResults.get(columnNameStringList[columnIndex]).getRight();
break;
default:
throw new UnsupportedOperationException(
String.format(
"The output tablet does not support column type %s",
valueColumnTypes[rowIndex]));
}
} else {
bitMaps[columnIndex].mark(rowIndex);
}
}
}
// Filter null outputs
final Integer[] originColumnIndex2FilteredColumnIndexMapperList =
new Integer[columnNameStringList.length];
int filteredCount = 0;
for (int i = 0; i < columnNameStringList.length; ++i) {
if (!bitMaps[i].isAllMarked()) {
originColumnIndex2FilteredColumnIndexMapperList[i] = ++filteredCount;
}
}
final String outputTimeSeries =
outputDatabaseWithPathSeparator.isEmpty()
? timeSeries
: outputDatabaseWithPathSeparator + timeSeries;
if (filteredCount == columnNameStringList.length) {
// No filter, collect rows
for (int rowIndex = 0; rowIndex < distinctOutputs.size(); ++rowIndex) {
collector.collectRow(
rowIndex == 0
? new PipeResetTabletRow(
rowIndex,
outputTimeSeries,
false,
measurementSchemaList,
timestampColumn,
valueColumnTypes,
valueColumns,
bitMaps,
columnNameStringList)
: new PipeRow(
rowIndex,
outputTimeSeries,
false,
measurementSchemaList,
timestampColumn,
valueColumnTypes,
valueColumns,
bitMaps,
columnNameStringList));
}
} else {
// Recompute the column arrays
final MeasurementSchema[] filteredMeasurementSchemaList =
new MeasurementSchema[filteredCount];
final String[] filteredColumnNameStringList = new String[filteredCount];
final TSDataType[] filteredValueColumnTypes = new TSDataType[filteredCount];
final Object[] filteredValueColumns = new Object[filteredCount];
final BitMap[] filteredBitMaps = new BitMap[filteredCount];
for (int i = 0; i < originColumnIndex2FilteredColumnIndexMapperList.length; i++) {
if (originColumnIndex2FilteredColumnIndexMapperList[i] != null) {
final int filteredColumnIndex = originColumnIndex2FilteredColumnIndexMapperList[i];
filteredMeasurementSchemaList[filteredColumnIndex] = measurementSchemaList[i];
filteredColumnNameStringList[filteredColumnIndex] = columnNameStringList[i];
filteredValueColumnTypes[filteredColumnIndex] = valueColumnTypes[i];
filteredBitMaps[filteredColumnIndex] = bitMaps[i];
filteredValueColumns[filteredColumnIndex] = valueColumns[i];
}
}
// Collect rows
for (int rowIndex = 0; rowIndex < distinctOutputs.size(); ++rowIndex) {
collector.collectRow(
rowIndex == 0
? new PipeResetTabletRow(
rowIndex,
outputTimeSeries,
false,
filteredMeasurementSchemaList,
timestampColumn,
filteredValueColumnTypes,
filteredValueColumns,
filteredBitMaps,
filteredColumnNameStringList)
: new PipeRow(
rowIndex,
outputTimeSeries,
false,
filteredMeasurementSchemaList,
timestampColumn,
filteredValueColumnTypes,
filteredValueColumns,
filteredBitMaps,
filteredColumnNameStringList));
}
}
}