public Flux parseTimeSeriesRowModel()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InCompressDataSource.java [138:261]


  public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
    return Flux.deferContextual(
        context -> {
          PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
          ImportModel importModel = pcontext.getModel();
          String version = context.get("VERSION");
          return Flux.create(
              (Consumer<FluxSink<TimeSeriesRowModel>>)
                  sink -> {
                    try {
                      Pair<String[], String> pair = parseCompressHeader(in);
                      String[] timeseries = pair.getLeft();
                      String deviceName = null;
                      if (timeseries.length != 0) {
                        deviceName = timeseries[0].substring(0, timeseries[0].lastIndexOf("."));
                      }

                      StringBuilder sql = new StringBuilder();
                      sql.append(" show devices ")
                          .append(ExportPipelineService.formatPath(deviceName, version));
                      SessionDataSet alignedSet =
                          importModel.getSession().executeQueryStatement(sql.toString());

                      List<String> columnNameList = alignedSet.getColumnNames();
                      DeviceModel deviceModel = new DeviceModel();
                      if (alignedSet.hasNext()) {
                        RowRecord record = alignedSet.next();
                        int position = columnNameList.indexOf("isAligned");
                        String aligned = "false";
                        if (position != -1) {
                          aligned = record.getFields().get(position).getStringValue();
                        }
                        deviceModel.setDeviceName(deviceName);
                        deviceModel.setAligned(Boolean.parseBoolean(aligned));
                      }
                      sql.delete(0, sql.length());
                      sql.append("show timeseries ")
                          .append(ExportPipelineService.formatPath(deviceName, version))
                          .append(".*");
                      SessionDataSet timeseriesSet =
                          importModel.getSession().executeQueryStatement(sql.toString());
                      columnNameList = timeseriesSet.getColumnNames();
                      Map<String, TSDataType> tsDataTypeMap = new HashMap<>();
                      while (timeseriesSet.hasNext()) {
                        RowRecord record = timeseriesSet.next();
                        int position = columnNameList.indexOf("timeseries");
                        String measurement = record.getFields().get(position).getStringValue();
                        position = columnNameList.indexOf("dataType");
                        String type = record.getFields().get(position).getStringValue();
                        tsDataTypeMap.put(measurement, importPipelineService.parseTsDataType(type));
                      }
                      if (tsDataTypeMap.size() == 0) {
                        throw new ParamCheckException(
                            "the timeseries of device:"
                                + ExportPipelineService.formatPath(deviceName, version)
                                + " does not exist");
                      }

                      while (true) {
                        List<TimeSeriesRowModel> rowModelList = new ArrayList<>();
                        List<String[]> data =
                            parseCompressData(in, pair.getLeft().length + 1, pair.getRight());
                        if (data.size() == 0) {
                          break;
                        }
                        for (String times : data.get(0)) {
                          TimeSeriesRowModel rowModel = new TimeSeriesRowModel();
                          rowModel.setTimestamp(times);
                          rowModel.setDeviceModel(deviceModel);
                          rowModelList.add(rowModel);
                        }

                        for (int i = 1; i < data.size(); i++) {
                          String[] v = data.get(i);
                          for (int j = 0; j < v.length; j++) {
                            TimeSeriesRowModel rowModel = rowModelList.get(j);
                            if (rowModel.getIFieldList() == null) {
                              rowModel.setIFieldList(new ArrayList<>());
                            }
                            String value = v[j];
                            String measurement = timeseries[i - 1];
                            TSDataType type = tsDataTypeMap.get(timeseries[i - 1]);
                            IField iField = new IField();
                            iField.setColumnName(measurement);
                            if (value == null) {
                              iField.setField(null);
                            } else {
                              Field field = new Field(type);
                              field = importPipelineService.generateFieldValue(field, value);
                              iField.setField(FieldCopy.copy(field));
                            }
                            rowModel.getIFieldList().add(iField);
                          }
                        }
                        for (TimeSeriesRowModel rowModel : rowModelList) {
                          sink.next(rowModel);
                        }
                      }
                      TimeSeriesRowModel finishRowModel = new TimeSeriesRowModel();
                      DeviceModel finishDeviceModel = new DeviceModel();
                      StringBuilder builder = new StringBuilder();
                      builder.append("finish,").append(deviceName);
                      finishDeviceModel.setDeviceName(builder.toString());
                      finishRowModel.setDeviceModel(finishDeviceModel);
                      finishRowModel.setIFieldList(new ArrayList<>());
                      sink.next(finishRowModel);
                      sink.complete();
                    } catch (IOException
                        | StatementExecutionException
                        | IoTDBConnectionException
                        | ParamCheckException e) {
                      sink.error(e);
                    } finally {
                      try {
                        if (in != null) {
                          in.close();
                        }
                      } catch (IOException e) {
                        log.error("异常信息:", e);
                      }
                    }
                  });
        });
  }