public Flux parseToRowModel()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/out/source/OutStructureSource.java [146:187]


  public Flux<TimeSeriesRowModel> parseToRowModel(DeviceModel deviceModel) {
    return Flux.deferContextual(
        contextView -> {
          PipelineContext<ExportModel> pcontext = contextView.get("pipelineContext");
          ExportModel exportModel = pcontext.getModel();
          String version = contextView.get("VERSION");
          return Flux.create(
              (Consumer<FluxSink<TimeSeriesRowModel>>)
                  sink -> {
                    Session session = exportModel.getSession();
                    StringBuilder buffer = new StringBuilder();
                    buffer
                        .append("show timeseries ")
                        .append(
                            ExportPipelineService.formatPath(deviceModel.getDeviceName(), version))
                        .append(".*");
                    String sql = buffer.toString();
                    try {
                      SessionDataSet timeseriesData = session.executeQueryStatement(sql);
                      List<String> cloumnNameList = timeseriesData.getColumnNames();
                      while (timeseriesData.hasNext()) {
                        TimeSeriesRowModel rowModel = new TimeSeriesRowModel();
                        rowModel.setDeviceModel(deviceModel);
                        RowRecord record = timeseriesData.next();
                        List<IField> iFieldList = new ArrayList<>();
                        for (String name : cloumnNameList) {
                          IField iField = new IField();
                          iField.setColumnName(name);
                          iField.setField(
                              FieldCopy.copy(record.getFields().get(cloumnNameList.indexOf(name))));
                          iFieldList.add(iField);
                        }
                        rowModel.setIFieldList(iFieldList);
                        sink.next(rowModel);
                      }
                      sink.complete();
                    } catch (StatementExecutionException | IoTDBConnectionException e) {
                      sink.error(e);
                    }
                  });
        });
  }