public Flux parseTimeSeriesRowModel()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InCsvDataSource.java [133:264]


  public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
    return Flux.deferContextual(
        context -> {
          PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
          ImportModel importModel = pcontext.getModel();
          String version = context.get("VERSION");
          return Flux.create(
              (Consumer<FluxSink<TimeSeriesRowModel>>)
                  sink -> {
                    try {
                      CSVParser parser =
                          CSVFormat.Builder.create(CSVFormat.DEFAULT)
                              .setHeader()
                              .setSkipHeaderRecord(true)
                              .setQuote('\\')
                              .setEscape('\\')
                              .setIgnoreEmptyLines(true)
                              .setQuoteMode(QuoteMode.NONE)
                              .build()
                              .parse(new InputStreamReader(in, importModel.getCharSet()));
                      Iterator<CSVRecord> it = parser.iterator();

                      Map<String, Integer> fileHeaderMap = parser.getHeaderMap();
                      String entityPath = null;
                      for (String header : fileHeaderMap.keySet()) {
                        if ("Time".equals(header)) {
                          continue;
                        } else {
                          entityPath = header.substring(0, header.lastIndexOf("."));
                        }
                      }
                      // 存在entitypath为空的情况,也就是csv中只有time一列的时候
                      if (entityPath != null) {
                        StringBuilder sql = new StringBuilder();
                        sql.append(" show devices ")
                            .append(ExportPipelineService.formatPath(entityPath, version));
                        SessionDataSet alignedSet =
                            importModel.getSession().executeQueryStatement(sql.toString());

                        List<String> columnNameList = alignedSet.getColumnNames();
                        DeviceModel deviceModel = new DeviceModel();
                        if (alignedSet.hasNext()) {
                          RowRecord record = alignedSet.next();
                          int position = columnNameList.indexOf("isAligned");
                          String aligned = "false";
                          if (position != -1) {
                            aligned = record.getFields().get(position).getStringValue();
                          }
                          deviceModel.setDeviceName(entityPath);
                          deviceModel.setAligned(Boolean.parseBoolean(aligned));
                        }
                        sql.delete(0, sql.length());
                        sql.append("show timeseries ")
                            .append(ExportPipelineService.formatPath(entityPath, version))
                            .append(".*");
                        SessionDataSet timeseriesSet =
                            importModel.getSession().executeQueryStatement(sql.toString());
                        columnNameList = timeseriesSet.getColumnNames();
                        Map<String, TSDataType> tsDataTypeMap = new HashMap<>();
                        while (timeseriesSet.hasNext()) {
                          RowRecord record = timeseriesSet.next();
                          int position = columnNameList.indexOf("timeseries");
                          String timeseries = record.getFields().get(position).getStringValue();
                          position = columnNameList.indexOf("dataType");
                          String type = record.getFields().get(position).getStringValue();
                          tsDataTypeMap.put(
                              timeseries, importPipelineService.parseTsDataType(type));
                        }

                        if (tsDataTypeMap.size() == 0) {
                          throw new ParamCheckException(
                              "the timeseries of device:"
                                  + ExportPipelineService.formatPath(entityPath, version)
                                  + "do not exist");
                        }

                        while (it.hasNext()) {
                          TimeSeriesRowModel timeSeriesRowModel = new TimeSeriesRowModel();
                          timeSeriesRowModel.setDeviceModel(deviceModel);
                          CSVRecord record = it.next();
                          for (String header : fileHeaderMap.keySet()) {
                            if ("Time".equals(header)) {
                              timeSeriesRowModel.setTimestamp(
                                  record.get(fileHeaderMap.get(header)));
                              continue;
                            }
                            IField iField = new IField();
                            iField.setColumnName(header);
                            Field field = new Field(tsDataTypeMap.get(header));
                            try {
                              field =
                                  importPipelineService.generateFieldValue(
                                      field, record.get(fileHeaderMap.get(header)));
                            } catch (Exception e) {
                              log.error("异常信息:", e);
                            }

                            iField.setField(FieldCopy.copy(field));
                            if (timeSeriesRowModel.getIFieldList() == null) {
                              timeSeriesRowModel.setIFieldList(new ArrayList<>());
                            }
                            timeSeriesRowModel.getIFieldList().add(iField);
                          }
                          sink.next(timeSeriesRowModel);
                        }
                        TimeSeriesRowModel finishRowModel = new TimeSeriesRowModel();
                        DeviceModel finishDeviceModel = new DeviceModel();
                        StringBuilder builder = new StringBuilder();
                        builder.append("finish,").append(deviceModel.getDeviceName());
                        finishDeviceModel.setDeviceName(builder.toString());
                        finishRowModel.setDeviceModel(finishDeviceModel);
                        finishRowModel.setIFieldList(new ArrayList<>());
                        sink.next(finishRowModel);
                      }
                      sink.complete();
                    } catch (IOException
                        | StatementExecutionException
                        | IoTDBConnectionException
                        | ParamCheckException e) {
                      sink.error(e);
                    } finally {
                      try {
                        if (in != null) {
                          in.close();
                        }
                      } catch (IOException e) {
                        log.error("异常信息:", e);
                      }
                    }
                  });
        });
  }