in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InStructureSource.java [128:188]
public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
return Flux.deferContextual(
context -> {
PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
ImportModel importModel = pcontext.getModel();
return Flux.create(
(Consumer<FluxSink<TimeSeriesRowModel>>)
sink -> {
try {
CSVParser parser =
CSVFormat.Builder.create(CSVFormat.DEFAULT)
.setHeader()
.setSkipHeaderRecord(true)
.setQuote('\\')
.setEscape('\\')
.setIgnoreEmptyLines(true)
.setQuoteMode(QuoteMode.NONE)
.build()
.parse(new InputStreamReader(in, importModel.getCharSet()));
Iterator<CSVRecord> it = parser.iterator();
Map<String, Integer> fileHeaderMap = parser.getHeaderMap();
while (it.hasNext()) {
CSVRecord record = it.next();
TimeSeriesRowModel rowModel = new TimeSeriesRowModel();
List<IField> iFieldList = new ArrayList<>();
DeviceModel deviceModel = new DeviceModel();
String alignedString =
parseCsvString(record.get(fileHeaderMap.get("aligned")));
deviceModel.setAligned(Boolean.parseBoolean(alignedString));
String deviceId =
parseCsvString(record.get(fileHeaderMap.get("timeseries")));
deviceModel.setDeviceName(deviceId.substring(0, deviceId.lastIndexOf(".")));
for (String header : fileHeaderMap.keySet()) {
if (!"aligned".equals(header)) {
IField iField = new IField();
Field field = new Field(TEXT);
field =
generateFieldValue(field, record.get(fileHeaderMap.get(header)));
iField.setField(FieldCopy.copy(field));
iField.setColumnName(header);
iFieldList.add(iField);
}
}
rowModel.setIFieldList(iFieldList);
rowModel.setDeviceModel(deviceModel);
sink.next(rowModel);
}
sink.complete();
} catch (IOException e) {
sink.error(e);
} finally {
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
}
});
});
}