in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InSqlDataSource.java [128:174]
public Flux<String> parseFluxSqlString(InputStream inputStream) {
return Flux.deferContextual(
contextView -> {
String version = contextView.get("VERSION");
return Flux.create(
fluxSink -> {
try {
StringBuilder sql = new StringBuilder();
int i;
while ((i = inputStream.read()) != -1) {
if (i == '\n') {
String s = sql.toString().replace(";", "");
String s1 = s.substring(0, s.indexOf("root"));
String s2 = s.substring(s.indexOf("root"), s.indexOf("("));
s1 = s1 + ExportPipelineService.formatPath(s2, version);
String needFormat = s.substring(s.indexOf("(") + 1, s.indexOf(")"));
String s3 = s.substring(s.indexOf(")"), s.length());
StringBuilder result = new StringBuilder();
result
.append(s1)
.append("(")
.append(this.formatPath(needFormat, version))
.append(s3);
sql.delete(0, sql.length());
fluxSink.next(result.toString());
} else if (i == '\r') {
} else {
sql = sql.append((char) i);
}
}
fluxSink.next("finish");
fluxSink.complete();
} catch (IOException e) {
fluxSink.error(e);
} finally {
try {
if (inputStream != null) {
inputStream.close();
}
} catch (IOException e) {
log.error("异常信息:", e);
}
}
});
});
}