in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InCompressDataSource.java [138:261]
public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
return Flux.deferContextual(
context -> {
PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
ImportModel importModel = pcontext.getModel();
String version = context.get("VERSION");
return Flux.create(
(Consumer<FluxSink<TimeSeriesRowModel>>)
sink -> {
try {
Pair<String[], String> pair = parseCompressHeader(in);
String[] timeseries = pair.getLeft();
String deviceName = null;
if (timeseries.length != 0) {
deviceName = timeseries[0].substring(0, timeseries[0].lastIndexOf("."));
}
StringBuilder sql = new StringBuilder();
sql.append(" show devices ")
.append(ExportPipelineService.formatPath(deviceName, version));
SessionDataSet alignedSet =
importModel.getSession().executeQueryStatement(sql.toString());
List<String> columnNameList = alignedSet.getColumnNames();
DeviceModel deviceModel = new DeviceModel();
if (alignedSet.hasNext()) {
RowRecord record = alignedSet.next();
int position = columnNameList.indexOf("isAligned");
String aligned = "false";
if (position != -1) {
aligned = record.getFields().get(position).getStringValue();
}
deviceModel.setDeviceName(deviceName);
deviceModel.setAligned(Boolean.parseBoolean(aligned));
}
sql.delete(0, sql.length());
sql.append("show timeseries ")
.append(ExportPipelineService.formatPath(deviceName, version))
.append(".*");
SessionDataSet timeseriesSet =
importModel.getSession().executeQueryStatement(sql.toString());
columnNameList = timeseriesSet.getColumnNames();
Map<String, TSDataType> tsDataTypeMap = new HashMap<>();
while (timeseriesSet.hasNext()) {
RowRecord record = timeseriesSet.next();
int position = columnNameList.indexOf("timeseries");
String measurement = record.getFields().get(position).getStringValue();
position = columnNameList.indexOf("dataType");
String type = record.getFields().get(position).getStringValue();
tsDataTypeMap.put(measurement, importPipelineService.parseTsDataType(type));
}
if (tsDataTypeMap.size() == 0) {
throw new ParamCheckException(
"the timeseries of device:"
+ ExportPipelineService.formatPath(deviceName, version)
+ " does not exist");
}
while (true) {
List<TimeSeriesRowModel> rowModelList = new ArrayList<>();
List<String[]> data =
parseCompressData(in, pair.getLeft().length + 1, pair.getRight());
if (data.size() == 0) {
break;
}
for (String times : data.get(0)) {
TimeSeriesRowModel rowModel = new TimeSeriesRowModel();
rowModel.setTimestamp(times);
rowModel.setDeviceModel(deviceModel);
rowModelList.add(rowModel);
}
for (int i = 1; i < data.size(); i++) {
String[] v = data.get(i);
for (int j = 0; j < v.length; j++) {
TimeSeriesRowModel rowModel = rowModelList.get(j);
if (rowModel.getIFieldList() == null) {
rowModel.setIFieldList(new ArrayList<>());
}
String value = v[j];
String measurement = timeseries[i - 1];
TSDataType type = tsDataTypeMap.get(timeseries[i - 1]);
IField iField = new IField();
iField.setColumnName(measurement);
if (value == null) {
iField.setField(null);
} else {
Field field = new Field(type);
field = importPipelineService.generateFieldValue(field, value);
iField.setField(FieldCopy.copy(field));
}
rowModel.getIFieldList().add(iField);
}
}
for (TimeSeriesRowModel rowModel : rowModelList) {
sink.next(rowModel);
}
}
TimeSeriesRowModel finishRowModel = new TimeSeriesRowModel();
DeviceModel finishDeviceModel = new DeviceModel();
StringBuilder builder = new StringBuilder();
builder.append("finish,").append(deviceName);
finishDeviceModel.setDeviceName(builder.toString());
finishRowModel.setDeviceModel(finishDeviceModel);
finishRowModel.setIFieldList(new ArrayList<>());
sink.next(finishRowModel);
sink.complete();
} catch (IOException
| StatementExecutionException
| IoTDBConnectionException
| ParamCheckException e) {
sink.error(e);
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException e) {
log.error("异常信息:", e);
}
}
});
});
}