in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/CommonPipeline.java [67:131]
private Disposable syncRun(Iterator<Component> it, Disposable disposable) {
Component component = it.next();
Flux pipelineFlux = Flux.just("------------------pipeline start------------- \r\n");
if (disposable != null) {
Disposable finalDisposable = disposable;
pipelineFlux =
pipelineFlux
.publishOn(Schedulers.newSingle("single"))
.flatMap(
s -> {
while (!finalDisposable.isDisposed()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
log.error("异常信息:", e);
}
}
return Flux.just(s);
});
}
pipelineFlux =
pipelineFlux
.transform((Function<? super Flux, Flux>) component.execute())
.contextWrite(
ctx -> {
String version = "13";
try {
version = getIotdbVersion(pipelineContext.getModel().getSession());
} catch (StatementExecutionException | IoTDBConnectionException e) {
log.error("获取iotdb version异常", e);
}
ctx = ((Context) ctx).put("pipelineContext", pipelineContext);
ctx = ((Context) ctx).put("VERSION", version);
return ctx;
});
if (it.hasNext()) {
disposable = pipelineFlux.subscribe();
disposableList.add(disposable);
return syncRun(it, disposable);
} else {
disposable =
pipelineFlux
.doOnError(
e -> {
log.error("异常信息:", e);
IECommonModel ieCommonModel = pipelineContext.getModel();
if (ieCommonModel.getE() != null) {
ieCommonModel.getE().accept((Throwable) e);
}
})
.doFinally(
type -> {
SignalType signalType = (SignalType) type;
// 提供回调
IECommonModel ieCommonModel = pipelineContext.getModel();
if (ieCommonModel.getConsumer() != null) {
ieCommonModel.getConsumer().accept(signalType);
}
})
.subscribe();
disposableList.add(disposable);
return disposable;
}
}