in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InCsvDataSource.java [133:264]
public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
return Flux.deferContextual(
context -> {
PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
ImportModel importModel = pcontext.getModel();
String version = context.get("VERSION");
return Flux.create(
(Consumer<FluxSink<TimeSeriesRowModel>>)
sink -> {
try {
CSVParser parser =
CSVFormat.Builder.create(CSVFormat.DEFAULT)
.setHeader()
.setSkipHeaderRecord(true)
.setQuote('\\')
.setEscape('\\')
.setIgnoreEmptyLines(true)
.setQuoteMode(QuoteMode.NONE)
.build()
.parse(new InputStreamReader(in, importModel.getCharSet()));
Iterator<CSVRecord> it = parser.iterator();
Map<String, Integer> fileHeaderMap = parser.getHeaderMap();
String entityPath = null;
for (String header : fileHeaderMap.keySet()) {
if ("Time".equals(header)) {
continue;
} else {
entityPath = header.substring(0, header.lastIndexOf("."));
}
}
// 存在entitypath为空的情况,也就是csv中只有time一列的时候
if (entityPath != null) {
StringBuilder sql = new StringBuilder();
sql.append(" show devices ")
.append(ExportPipelineService.formatPath(entityPath, version));
SessionDataSet alignedSet =
importModel.getSession().executeQueryStatement(sql.toString());
List<String> columnNameList = alignedSet.getColumnNames();
DeviceModel deviceModel = new DeviceModel();
if (alignedSet.hasNext()) {
RowRecord record = alignedSet.next();
int position = columnNameList.indexOf("isAligned");
String aligned = "false";
if (position != -1) {
aligned = record.getFields().get(position).getStringValue();
}
deviceModel.setDeviceName(entityPath);
deviceModel.setAligned(Boolean.parseBoolean(aligned));
}
sql.delete(0, sql.length());
sql.append("show timeseries ")
.append(ExportPipelineService.formatPath(entityPath, version))
.append(".*");
SessionDataSet timeseriesSet =
importModel.getSession().executeQueryStatement(sql.toString());
columnNameList = timeseriesSet.getColumnNames();
Map<String, TSDataType> tsDataTypeMap = new HashMap<>();
while (timeseriesSet.hasNext()) {
RowRecord record = timeseriesSet.next();
int position = columnNameList.indexOf("timeseries");
String timeseries = record.getFields().get(position).getStringValue();
position = columnNameList.indexOf("dataType");
String type = record.getFields().get(position).getStringValue();
tsDataTypeMap.put(
timeseries, importPipelineService.parseTsDataType(type));
}
if (tsDataTypeMap.size() == 0) {
throw new ParamCheckException(
"the timeseries of device:"
+ ExportPipelineService.formatPath(entityPath, version)
+ "do not exist");
}
while (it.hasNext()) {
TimeSeriesRowModel timeSeriesRowModel = new TimeSeriesRowModel();
timeSeriesRowModel.setDeviceModel(deviceModel);
CSVRecord record = it.next();
for (String header : fileHeaderMap.keySet()) {
if ("Time".equals(header)) {
timeSeriesRowModel.setTimestamp(
record.get(fileHeaderMap.get(header)));
continue;
}
IField iField = new IField();
iField.setColumnName(header);
Field field = new Field(tsDataTypeMap.get(header));
try {
field =
importPipelineService.generateFieldValue(
field, record.get(fileHeaderMap.get(header)));
} catch (Exception e) {
log.error("异常信息:", e);
}
iField.setField(FieldCopy.copy(field));
if (timeSeriesRowModel.getIFieldList() == null) {
timeSeriesRowModel.setIFieldList(new ArrayList<>());
}
timeSeriesRowModel.getIFieldList().add(iField);
}
sink.next(timeSeriesRowModel);
}
TimeSeriesRowModel finishRowModel = new TimeSeriesRowModel();
DeviceModel finishDeviceModel = new DeviceModel();
StringBuilder builder = new StringBuilder();
builder.append("finish,").append(deviceModel.getDeviceName());
finishDeviceModel.setDeviceName(builder.toString());
finishRowModel.setDeviceModel(finishDeviceModel);
finishRowModel.setIFieldList(new ArrayList<>());
sink.next(finishRowModel);
}
sink.complete();
} catch (IOException
| StatementExecutionException
| IoTDBConnectionException
| ParamCheckException e) {
sink.error(e);
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
log.error("异常信息:", e);
}
}
});
});
}