public Flux parseTimeSeriesRowModel()

in backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InStructureSource.java [128:188]


  public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
    return Flux.deferContextual(
        context -> {
          PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
          ImportModel importModel = pcontext.getModel();
          return Flux.create(
              (Consumer<FluxSink<TimeSeriesRowModel>>)
                  sink -> {
                    try {
                      CSVParser parser =
                          CSVFormat.Builder.create(CSVFormat.DEFAULT)
                              .setHeader()
                              .setSkipHeaderRecord(true)
                              .setQuote('\\')
                              .setEscape('\\')
                              .setIgnoreEmptyLines(true)
                              .setQuoteMode(QuoteMode.NONE)
                              .build()
                              .parse(new InputStreamReader(in, importModel.getCharSet()));
                      Iterator<CSVRecord> it = parser.iterator();
                      Map<String, Integer> fileHeaderMap = parser.getHeaderMap();

                      while (it.hasNext()) {
                        CSVRecord record = it.next();
                        TimeSeriesRowModel rowModel = new TimeSeriesRowModel();
                        List<IField> iFieldList = new ArrayList<>();
                        DeviceModel deviceModel = new DeviceModel();
                        String alignedString =
                            parseCsvString(record.get(fileHeaderMap.get("aligned")));
                        deviceModel.setAligned(Boolean.parseBoolean(alignedString));
                        String deviceId =
                            parseCsvString(record.get(fileHeaderMap.get("timeseries")));
                        deviceModel.setDeviceName(deviceId.substring(0, deviceId.lastIndexOf(".")));
                        for (String header : fileHeaderMap.keySet()) {
                          if (!"aligned".equals(header)) {
                            IField iField = new IField();
                            Field field = new Field(TEXT);
                            field =
                                generateFieldValue(field, record.get(fileHeaderMap.get(header)));
                            iField.setField(FieldCopy.copy(field));
                            iField.setColumnName(header);
                            iFieldList.add(iField);
                          }
                        }
                        rowModel.setIFieldList(iFieldList);
                        rowModel.setDeviceModel(deviceModel);
                        sink.next(rowModel);
                      }
                      sink.complete();
                    } catch (IOException e) {
                      sink.error(e);
                    } finally {
                      try {
                        in.close();
                      } catch (IOException e) {
                        e.printStackTrace();
                      }
                    }
                  });
        });
  }