private void createStreamData()

in backup-core/src/main/java/org/apache/iotdb/backup/core/service/ExportPipelineService.java [295:415]


  private void createStreamData(
      DeviceModel deviceModel,
      List<TimeseriesModel> timeseries,
      Session session,
      FluxSink<TimeSeriesRowModel> sink,
      ExportModel exportModel,
      String version)
      throws StatementExecutionException, IoTDBConnectionException {

    List<List<TimeseriesModel>> groupTimeseriesList = ListUtils.partition(timeseries, T_LIMIT);
    List<SessionDataSet> dataSetList = new ArrayList<>();

    for (List<TimeseriesModel> l : groupTimeseriesList) {
      StringBuilder timeseriesBuffer = new StringBuilder();
      for (TimeseriesModel s : l) {
        if (timeseriesBuffer.length() == 0) {
          timeseriesBuffer.append(
              formatMeasurement(
                  s.getName().replace(deviceModel.getDeviceName() + ".", ""), version));
        } else {
          timeseriesBuffer
              .append(",")
              .append(
                  formatMeasurement(
                      s.getName().replace(deviceModel.getDeviceName() + ".", ""), version));
        }
      }
      StringBuilder sqlBuffer = new StringBuilder();
      sqlBuffer
          .append("select ")
          .append(timeseriesBuffer.toString())
          .append(" from ")
          .append(formatPath(deviceModel.getDeviceName(), version));
      if (exportModel.getWhereClause() != null && !"".equals(exportModel.getWhereClause())) {
        sqlBuffer.append(" where ").append(exportModel.getWhereClause());
      }
      String sql = sqlBuffer.toString();
      SessionDataSet deviceDetials = null;
      try {
        deviceDetials = session.executeQueryStatement(sql, Long.MAX_VALUE);
      } catch (Exception e) {
        log.error(sql);
        throw e;
      }
      dataSetList.add(deviceDetials);
    }
    sink.onRequest(
        n -> {
          synchronized (dataSetList) {
            Map<Long, List<IField>[]> sinkPoolMap = new TreeMap<>();
            Long mark = 0L; // 时间戳标志,sinkpoolMap根据此生成流,如果有小于此值得数据,sink.next会一次性添加多个stream流数据
            Long stopMark = 0L; // 结束标志,当一个时间戳连续出现两次的时候,表示读取完毕,单组数据遍历完毕
            int loopMark =
                0; // 假设一个时间序列按列分为了2组,首先遍历第一组 loopMark=0,当第一组遍历完成后,第二组也许比第一组数据多一些,在遍历第二组 loopMark=1
            boolean stopFlag = false;
            if (dataSetList == null || dataSetList.size() == 0) {
              stopFlag = true;
            }
            int j = 0;
            try {
              // 如果列数大于1000,拼接row数据
              while (true) {
                for (int i = 0; i < dataSetList.size(); i++) {
                  SessionDataSet set = dataSetList.get(i);
                  mark =
                      recursionSessionData(
                          i,
                          loopMark,
                          mark,
                          dataSetList.size(),
                          set,
                          sinkPoolMap,
                          deviceModel,
                          timeseries);
                  if (i == dataSetList.size() - 1) {
                    if (stopMark.equals(mark)) {
                      if (loopMark == dataSetList.size() - 1) {
                        stopFlag = true;
                      }
                      loopMark++;
                    }
                    stopMark = mark;
                    for (Long key : sinkPoolMap.keySet()) {
                      if (key <= mark) {
                        try {
                          TimeSeriesRowModel rowModel =
                              conformToRowData(
                                  sinkPoolMap.get(key), groupTimeseriesList, deviceModel, key);
                          sinkPoolMap.remove(key);
                          sink.next(rowModel);
                          if (j == n) {
                            return;
                          }
                          j++;
                        } catch (Exception e) {
                          log.error("异常信息:", e);
                        }
                      }
                    }
                  }
                }
                if (stopFlag) {
                  // 流结束标志
                  TimeSeriesRowModel finishRowModel = new TimeSeriesRowModel();
                  DeviceModel finishDeviceModel = new DeviceModel();
                  StringBuilder builder = new StringBuilder();
                  builder.append("finish,").append(deviceModel.getDeviceName());
                  finishDeviceModel.setDeviceName(builder.toString());
                  finishRowModel.setDeviceModel(finishDeviceModel);
                  finishRowModel.setIFieldList(new ArrayList<>());
                  sink.next(finishRowModel);
                  sink.complete();
                  break;
                }
              }
            } catch (StatementExecutionException | IoTDBConnectionException e) {
              log.error("异常信息:", e);
            }
          }
        });
  }