in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/out/sink/OutTsfileDataSink.java [114:240]
private void doSink(List<TimeSeriesRowModel> bufferList) {
totalFileNum = totalSize[0];
int virtualSGNum = pcontext.getModel().getVirutalStorageGroupNum();
long partitionInterval = pcontext.getModel().getPartitionInterval();
Map<String, List<TimeSeriesRowModel>> groupByDeviceIdMap =
bufferList.stream()
.collect(
Collectors.toMap(
k -> k.getDeviceModel().getDeviceName(),
p -> {
List<TimeSeriesRowModel> result = new ArrayList();
result.add(p);
return result;
},
(o, n) -> {
o.addAll(n);
return o;
}));
for (String deviceIdKey : groupByDeviceIdMap.keySet()) {
if (deviceIdKey.startsWith("finish")) {
finishedFileNum.incrementAndGet();
continue;
}
Map<String, List<TimeSeriesRowModel>> groupByTsfileNameKeyMap =
groupByDeviceIdMap.get(deviceIdKey).stream()
.collect(
Collectors.toMap(
k ->
getTsfileName(
k.getDeviceModel().getDeviceName(),
k.getTimestamp(),
virtualSGNum,
partitionInterval),
p -> {
List<TimeSeriesRowModel> result = new ArrayList();
result.add(p);
return result;
},
(o, n) -> {
o.addAll(n);
return o;
}));
for (String tsfileNameKey : groupByTsfileNameKeyMap.keySet()) {
List<TimeSeriesRowModel> groupByTsfileNameKeyList =
groupByTsfileNameKeyMap.get(tsfileNameKey);
// 需要 根据deviceName 来判断他属于那个tsfile
TsFileWriter tsFileWriter =
getTsfileWriter(
tsfileWriterMap,
schemaMap,
deviceIdKey,
tsfileNameKey,
pcontext.getModel().getFileFolder());
Schema schema = schemaMap.get(tsfileNameKey);
registDevice(tsFileWriter, schema, deviceInfoMap, deviceIdKey);
// 需要转化grouplist数据为tsfile可以写入的数据
TimeSeriesRowModel firstRow = groupByTsfileNameKeyList.get(0);
DeviceModel deviceModel = firstRow.getDeviceModel();
List<MeasurementSchema> measurementSchemaList =
firstRow.getIFieldList().stream()
.map(
iField -> {
String columnName =
iField
.getColumnName()
.substring(deviceModel.getDeviceName().length() + 1);
TSDataType tsDataType = iField.getTsDataType();
MeasurementSchema measurementSchema =
new MeasurementSchema(columnName, tsDataType);
return measurementSchema;
})
.collect(Collectors.toList());
Tablet tablet = new Tablet(deviceModel.getDeviceName(), measurementSchemaList);
tablet.initBitMaps();
groupByTsfileNameKeyList.stream()
.forEach(
model -> {
List<FieldCopy> fields =
model.getIFieldList().stream()
.map(iField -> iField.getField())
.collect(Collectors.toList());
int rowIndex = tablet.rowSize++;
tablet.addTimestamp(rowIndex, Long.parseLong(model.getTimestamp()));
for (int i = 0; i < fields.size(); ) {
List<MeasurementSchema> schemas = tablet.getSchemas();
for (int j = 0; j < schemas.size(); j++) {
MeasurementSchema measurementSchema = schemas.get(j);
Object value = fields.get(i).getObjectValue(measurementSchema.getType());
if (value == null) {
tablet.bitMaps[j].mark(rowIndex);
}
tablet.addValue(measurementSchema.getMeasurementId(), rowIndex, value);
i++;
}
}
if (tablet.rowSize == tablet.getMaxRowNumber()) {
try {
exportPipelineService.syncWriteByTsfileWriter(
tsFileWriter, tablet, deviceModel.isAligned());
finishedRowNum.addAndGet(tablet.rowSize);
tablet.initBitMaps();
tablet.reset();
} catch (IOException | WriteProcessException e) {
e.printStackTrace();
}
}
});
try {
if (tablet.rowSize != 0) {
exportPipelineService.syncWriteByTsfileWriter(
tsFileWriter, tablet, deviceModel.isAligned());
finishedRowNum.addAndGet(tablet.rowSize);
}
tsFileWriter.flushAllChunkGroups();
} catch (IOException | WriteProcessException e) {
e.printStackTrace();
}
}
}
bufferList.clear();
}