in backup-core/src/main/java/org/apache/iotdb/backup/core/service/ExportPipelineService.java [480:546]
private Long recursionSessionData(
int loopi,
int loopMark,
Long mark,
int size,
SessionDataSet set,
Map<Long, List<IField>[]> sinkPoolMap,
DeviceModel deviceModel,
List<TimeseriesModel> timeseries)
throws StatementExecutionException, IoTDBConnectionException {
if (set.hasNext()) {
RowRecord rowRecord = set.next();
List<String> columnName = set.getColumnNames();
columnName.remove(0);
List<IField> fieldList =
columnName.stream()
.map(
clname -> {
int position = columnName.indexOf(clname);
Field field = rowRecord.getFields().get(position);
IField iField = new IField();
FieldCopy fieldCopy = FieldCopy.copy(field);
if (fieldCopy.getDataType() == null) {
timeseries.stream()
.forEach(
timeseriesModel -> {
String colName =
timeseriesModel
.getName()
.substring(deviceModel.getDeviceName().length() + 1);
if (colName.equals(clname)) {
iField.setTsDataType(timeseriesModel.getType());
}
});
}
iField.setField(fieldCopy);
iField.setColumnName(clname);
return iField;
})
.collect(Collectors.toList());
// 以loopMark对应的数据为时间基准
if (loopi == loopMark) {
mark = rowRecord.getTimestamp();
}
if (sinkPoolMap.get(rowRecord.getTimestamp()) != null) {
if (mark != 0 && rowRecord.getTimestamp() < mark) {
recursionSessionData(
loopi, loopMark, mark, size, set, sinkPoolMap, deviceModel, timeseries);
}
List<IField>[] array = sinkPoolMap.get(rowRecord.getTimestamp());
if (array != null) {
array[loopi] = fieldList;
sinkPoolMap.putIfAbsent(rowRecord.getTimestamp(), array);
}
} else {
if (mark != 0 && rowRecord.getTimestamp() < mark) {
recursionSessionData(
loopi, loopMark, mark, size, set, sinkPoolMap, deviceModel, timeseries);
}
List<IField>[] array = new List[size];
array[loopi] = fieldList;
sinkPoolMap.put(rowRecord.getTimestamp(), array);
}
return mark;
}
return mark;
}