in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/out/source/OutStructureSource.java [146:187]
public Flux<TimeSeriesRowModel> parseToRowModel(DeviceModel deviceModel) {
return Flux.deferContextual(
contextView -> {
PipelineContext<ExportModel> pcontext = contextView.get("pipelineContext");
ExportModel exportModel = pcontext.getModel();
String version = contextView.get("VERSION");
return Flux.create(
(Consumer<FluxSink<TimeSeriesRowModel>>)
sink -> {
Session session = exportModel.getSession();
StringBuilder buffer = new StringBuilder();
buffer
.append("show timeseries ")
.append(
ExportPipelineService.formatPath(deviceModel.getDeviceName(), version))
.append(".*");
String sql = buffer.toString();
try {
SessionDataSet timeseriesData = session.executeQueryStatement(sql);
List<String> cloumnNameList = timeseriesData.getColumnNames();
while (timeseriesData.hasNext()) {
TimeSeriesRowModel rowModel = new TimeSeriesRowModel();
rowModel.setDeviceModel(deviceModel);
RowRecord record = timeseriesData.next();
List<IField> iFieldList = new ArrayList<>();
for (String name : cloumnNameList) {
IField iField = new IField();
iField.setColumnName(name);
iField.setField(
FieldCopy.copy(record.getFields().get(cloumnNameList.indexOf(name))));
iFieldList.add(iField);
}
rowModel.setIFieldList(iFieldList);
sink.next(rowModel);
}
sink.complete();
} catch (StatementExecutionException | IoTDBConnectionException e) {
sink.error(e);
}
});
});
}