in backup-core/src/main/java/org/apache/iotdb/backup/core/service/ExportPipelineService.java [115:163]
public Flux<DeviceModel> parseToDeviceModel() {
return Flux.deferContextual(
context -> {
PipelineContext<ExportModel> pcontext = context.get("pipelineContext");
ExportModel exportModel = pcontext.getModel();
String version = context.get("VERSION");
return Flux.create(
sink -> {
StringBuilder sqlBuilder = new StringBuilder();
sqlBuilder
.append("show devices ")
.append(formatPath(exportModel.getIotdbPath(), version));
String sql = sqlBuilder.toString();
try {
SessionDataSet deviceData = exportModel.getSession().executeQueryStatement(sql);
sink.onRequest(
n -> {
String aa = "";
for (int i = 0; i < n; i++) {
try {
if (deviceData.hasNext()) {
RowRecord rowRecord = deviceData.next();
int positon = deviceData.getColumnNames().indexOf("devices");
String deviceFullName =
rowRecord.getFields().get(positon).getStringValue();
positon = deviceData.getColumnNames().indexOf("isAligned");
String isAligned = "false";
if (positon != -1) {
isAligned = rowRecord.getFields().get(positon).getStringValue();
}
DeviceModel deviceModel = new DeviceModel();
deviceModel.setDeviceName(deviceFullName);
deviceModel.setAligned(Boolean.parseBoolean(isAligned));
sink.next(deviceModel);
} else {
sink.complete();
}
} catch (StatementExecutionException | IoTDBConnectionException e) {
log.error("异常信息:", e);
}
}
});
} catch (StatementExecutionException | IoTDBConnectionException e) {
log.error("异常信息对应的SQL:{}", sql);
sink.complete();
}
});
});
}