private Disposable syncRun()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/CommonPipeline.java [67:131]


  private Disposable syncRun(Iterator<Component> it, Disposable disposable) {
    Component component = it.next();
    Flux pipelineFlux = Flux.just("------------------pipeline start------------- \r\n");
    if (disposable != null) {
      Disposable finalDisposable = disposable;
      pipelineFlux =
          pipelineFlux
              .publishOn(Schedulers.newSingle("single"))
              .flatMap(
                  s -> {
                    while (!finalDisposable.isDisposed()) {
                      try {
                        Thread.sleep(1000);
                      } catch (InterruptedException e) {
                        log.error("异常信息:", e);
                      }
                    }
                    return Flux.just(s);
                  });
    }
    pipelineFlux =
        pipelineFlux
            .transform((Function<? super Flux, Flux>) component.execute())
            .contextWrite(
                ctx -> {
                  String version = "13";
                  try {
                    version = getIotdbVersion(pipelineContext.getModel().getSession());
                  } catch (StatementExecutionException | IoTDBConnectionException e) {
                    log.error("获取iotdb version异常", e);
                  }
                  ctx = ((Context) ctx).put("pipelineContext", pipelineContext);
                  ctx = ((Context) ctx).put("VERSION", version);
                  return ctx;
                });

    if (it.hasNext()) {
      disposable = pipelineFlux.subscribe();
      disposableList.add(disposable);
      return syncRun(it, disposable);
    } else {
      disposable =
          pipelineFlux
              .doOnError(
                  e -> {
                    log.error("异常信息:", e);
                    IECommonModel ieCommonModel = pipelineContext.getModel();
                    if (ieCommonModel.getE() != null) {
                      ieCommonModel.getE().accept((Throwable) e);
                    }
                  })
              .doFinally(
                  type -> {
                    SignalType signalType = (SignalType) type;
                    // 提供回调
                    IECommonModel ieCommonModel = pipelineContext.getModel();
                    if (ieCommonModel.getConsumer() != null) {
                      ieCommonModel.getConsumer().accept(signalType);
                    }
                  })
              .subscribe();
      disposableList.add(disposable);
      return disposable;
    }
  }