in iotdb-collector/collector-core/src/main/java/org/apache/iotdb/collector/utils/builder/PipeTreeModelTsFileBuilder.java [197:267]
private void tryBestToWriteTabletsIntoOneFile(
final LinkedHashMap<String, LinkedList<Tablet>> device2TabletsLinkedList,
final Map<String, Boolean> device2Aligned)
throws IOException, WriteProcessException {
final Iterator<Map.Entry<String, LinkedList<Tablet>>> iterator =
device2TabletsLinkedList.entrySet().iterator();
while (iterator.hasNext()) {
final Map.Entry<String, LinkedList<Tablet>> entry = iterator.next();
final String deviceId = entry.getKey();
final LinkedList<Tablet> tablets = entry.getValue();
final List<Tablet> tabletsToWrite = new ArrayList<>();
Tablet lastTablet = null;
while (!tablets.isEmpty()) {
final Tablet tablet = tablets.peekFirst();
if (Objects.isNull(lastTablet)
// lastTablet.rowSize is not 0
|| lastTablet.getTimestamp(lastTablet.getRowSize() - 1) < tablet.getTimestamp(0)) {
tabletsToWrite.add(tablet);
lastTablet = tablet;
tablets.pollFirst();
} else {
break;
}
}
if (tablets.isEmpty()) {
iterator.remove();
}
final boolean isAligned = device2Aligned.get(deviceId);
if (isAligned) {
final Map<String, List<IMeasurementSchema>> deviceId2MeasurementSchemas = new HashMap<>();
tabletsToWrite.forEach(
tablet ->
deviceId2MeasurementSchemas.compute(
tablet.getDeviceId(),
(k, v) -> {
if (Objects.isNull(v)) {
return new ArrayList<>(tablet.getSchemas());
}
v.addAll(tablet.getSchemas());
return v;
}));
for (final Map.Entry<String, List<IMeasurementSchema>> deviceIdWithMeasurementSchemas :
deviceId2MeasurementSchemas.entrySet()) {
fileWriter.registerAlignedTimeseries(
new Path(deviceIdWithMeasurementSchemas.getKey()),
deviceIdWithMeasurementSchemas.getValue());
}
for (final Tablet tablet : tabletsToWrite) {
fileWriter.writeAligned(tablet);
}
} else {
for (final Tablet tablet : tabletsToWrite) {
for (final IMeasurementSchema schema : tablet.getSchemas()) {
try {
fileWriter.registerTimeseries(
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), schema);
} catch (final WriteProcessException ignore) {
// Do nothing if the timeSeries has been registered
}
}
fileWriter.writeTree(tablet);
}
}
}
}