public Flux parseFluxSqlString()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InSqlDataSource.java [128:174]


  public Flux<String> parseFluxSqlString(InputStream inputStream) {
    return Flux.deferContextual(
        contextView -> {
          String version = contextView.get("VERSION");
          return Flux.create(
              fluxSink -> {
                try {
                  StringBuilder sql = new StringBuilder();
                  int i;
                  while ((i = inputStream.read()) != -1) {
                    if (i == '\n') {
                      String s = sql.toString().replace(";", "");
                      String s1 = s.substring(0, s.indexOf("root"));
                      String s2 = s.substring(s.indexOf("root"), s.indexOf("("));
                      s1 = s1 + ExportPipelineService.formatPath(s2, version);
                      String needFormat = s.substring(s.indexOf("(") + 1, s.indexOf(")"));
                      String s3 = s.substring(s.indexOf(")"), s.length());
                      StringBuilder result = new StringBuilder();
                      result
                          .append(s1)
                          .append("(")
                          .append(this.formatPath(needFormat, version))
                          .append(s3);
                      sql.delete(0, sql.length());
                      fluxSink.next(result.toString());
                    } else if (i == '\r') {

                    } else {
                      sql = sql.append((char) i);
                    }
                  }
                  fluxSink.next("finish");
                  fluxSink.complete();
                } catch (IOException e) {
                  fluxSink.error(e);
                } finally {
                  try {
                    if (inputStream != null) {
                      inputStream.close();
                    }
                  } catch (IOException e) {
                    log.error("异常信息:", e);
                  }
                }
              });
        });
  }