in backup-core/src/main/java/org/apache/iotdb/backup/core/service/ExportPipelineService.java [295:415]
private void createStreamData(
DeviceModel deviceModel,
List<TimeseriesModel> timeseries,
Session session,
FluxSink<TimeSeriesRowModel> sink,
ExportModel exportModel,
String version)
throws StatementExecutionException, IoTDBConnectionException {
List<List<TimeseriesModel>> groupTimeseriesList = ListUtils.partition(timeseries, T_LIMIT);
List<SessionDataSet> dataSetList = new ArrayList<>();
for (List<TimeseriesModel> l : groupTimeseriesList) {
StringBuilder timeseriesBuffer = new StringBuilder();
for (TimeseriesModel s : l) {
if (timeseriesBuffer.length() == 0) {
timeseriesBuffer.append(
formatMeasurement(
s.getName().replace(deviceModel.getDeviceName() + ".", ""), version));
} else {
timeseriesBuffer
.append(",")
.append(
formatMeasurement(
s.getName().replace(deviceModel.getDeviceName() + ".", ""), version));
}
}
StringBuilder sqlBuffer = new StringBuilder();
sqlBuffer
.append("select ")
.append(timeseriesBuffer.toString())
.append(" from ")
.append(formatPath(deviceModel.getDeviceName(), version));
if (exportModel.getWhereClause() != null && !"".equals(exportModel.getWhereClause())) {
sqlBuffer.append(" where ").append(exportModel.getWhereClause());
}
String sql = sqlBuffer.toString();
SessionDataSet deviceDetials = null;
try {
deviceDetials = session.executeQueryStatement(sql, Long.MAX_VALUE);
} catch (Exception e) {
log.error(sql);
throw e;
}
dataSetList.add(deviceDetials);
}
sink.onRequest(
n -> {
synchronized (dataSetList) {
Map<Long, List<IField>[]> sinkPoolMap = new TreeMap<>();
Long mark = 0L; // 时间戳标志,sinkpoolMap根据此生成流,如果有小于此值得数据,sink.next会一次性添加多个stream流数据
Long stopMark = 0L; // 结束标志,当一个时间戳连续出现两次的时候,表示读取完毕,单组数据遍历完毕
int loopMark =
0; // 假设一个时间序列按列分为了2组,首先遍历第一组 loopMark=0,当第一组遍历完成后,第二组也许比第一组数据多一些,在遍历第二组 loopMark=1
boolean stopFlag = false;
if (dataSetList == null || dataSetList.size() == 0) {
stopFlag = true;
}
int j = 0;
try {
// 如果列数大于1000,拼接row数据
while (true) {
for (int i = 0; i < dataSetList.size(); i++) {
SessionDataSet set = dataSetList.get(i);
mark =
recursionSessionData(
i,
loopMark,
mark,
dataSetList.size(),
set,
sinkPoolMap,
deviceModel,
timeseries);
if (i == dataSetList.size() - 1) {
if (stopMark.equals(mark)) {
if (loopMark == dataSetList.size() - 1) {
stopFlag = true;
}
loopMark++;
}
stopMark = mark;
for (Long key : sinkPoolMap.keySet()) {
if (key <= mark) {
try {
TimeSeriesRowModel rowModel =
conformToRowData(
sinkPoolMap.get(key), groupTimeseriesList, deviceModel, key);
sinkPoolMap.remove(key);
sink.next(rowModel);
if (j == n) {
return;
}
j++;
} catch (Exception e) {
log.error("异常信息:", e);
}
}
}
}
}
if (stopFlag) {
// 流结束标志
TimeSeriesRowModel finishRowModel = new TimeSeriesRowModel();
DeviceModel finishDeviceModel = new DeviceModel();
StringBuilder builder = new StringBuilder();
builder.append("finish,").append(deviceModel.getDeviceName());
finishDeviceModel.setDeviceName(builder.toString());
finishRowModel.setDeviceModel(finishDeviceModel);
finishRowModel.setIFieldList(new ArrayList<>());
sink.next(finishRowModel);
sink.complete();
break;
}
}
} catch (StatementExecutionException | IoTDBConnectionException e) {
log.error("异常信息:", e);
}
}
});
}