public Flux parseToDeviceModel()

in backup-core/src/main/java/org/apache/iotdb/backup/core/service/ExportPipelineService.java [115:163]


  public Flux<DeviceModel> parseToDeviceModel() {
    return Flux.deferContextual(
        context -> {
          PipelineContext<ExportModel> pcontext = context.get("pipelineContext");
          ExportModel exportModel = pcontext.getModel();
          String version = context.get("VERSION");
          return Flux.create(
              sink -> {
                StringBuilder sqlBuilder = new StringBuilder();
                sqlBuilder
                    .append("show devices ")
                    .append(formatPath(exportModel.getIotdbPath(), version));
                String sql = sqlBuilder.toString();
                try {
                  SessionDataSet deviceData = exportModel.getSession().executeQueryStatement(sql);
                  sink.onRequest(
                      n -> {
                        String aa = "";
                        for (int i = 0; i < n; i++) {
                          try {
                            if (deviceData.hasNext()) {
                              RowRecord rowRecord = deviceData.next();
                              int positon = deviceData.getColumnNames().indexOf("devices");
                              String deviceFullName =
                                  rowRecord.getFields().get(positon).getStringValue();
                              positon = deviceData.getColumnNames().indexOf("isAligned");
                              String isAligned = "false";
                              if (positon != -1) {
                                isAligned = rowRecord.getFields().get(positon).getStringValue();
                              }
                              DeviceModel deviceModel = new DeviceModel();
                              deviceModel.setDeviceName(deviceFullName);
                              deviceModel.setAligned(Boolean.parseBoolean(isAligned));
                              sink.next(deviceModel);
                            } else {
                              sink.complete();
                            }
                          } catch (StatementExecutionException | IoTDBConnectionException e) {
                            log.error("异常信息:", e);
                          }
                        }
                      });
                } catch (StatementExecutionException | IoTDBConnectionException e) {
                  log.error("异常信息对应的SQL:{}", sql);
                  sink.complete();
                }
              });
        });
  }