backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InCompressDataSource.java [80:147]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private ConcurrentHashMap<String, List<InputStream>> COMPRESS_MAP = new ConcurrentHashMap<>();

  private Integer[] totalSize = new Integer[1];

  private int parallelism;

  @Override
  public Function<Flux<String>, Flux<TimeSeriesRowModel>> doExecute() {
    return flux ->
        flux.flatMap(
                s ->
                    Flux.deferContextual(
                        contextView -> {
                          PipelineContext<ImportModel> context = contextView.get("pipelineContext");
                          ImportModel importModel = context.getModel();
                          FilenameFilter filenameFilter = initFileFilter(importModel);
                          totalSize[0] =
                              importPipelineService.getFileArray(
                                      filenameFilter, importModel.getFileFolder())
                                  .length;
                          return importPipelineService.parseFluxFileName(
                              filenameFilter, COMPRESS_MAP);
                        }))
            .parallel(parallelism)
            .runOn(scheduler)
            .flatMap(this::parseTimeSeriesRowModel)
            .transform(doNext())
            .sequential()
            .doFinally(
                signalType -> {
                  for (String key : COMPRESS_MAP.keySet()) {
                    COMPRESS_MAP
                        .get(key)
                        .forEach(
                            inputStream -> {
                              if (inputStream != null) {
                                try {
                                  inputStream.close();
                                } catch (IOException e) {
                                }
                              }
                            });
                  }
                  scheduler.dispose();
                })
            .contextWrite(
                context -> {
                  return context.put("totalSize", totalSize);
                });
  }

  /**
   * 读取csv文件，把数据转化为TimeSeriesRowModel流 fileHeaderMap timeseries和csv中所在position的map tsDataTypeMap
   * timeseries和TSDataType的map 根据timeseries获取数据对应的类型以及其对应的值 record.get(fileHeaderMap.get(header))
   *
   * @param in
   * @return
   */
  public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
    return Flux.deferContextual(
        context -> {
          PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
          ImportModel importModel = pcontext.getModel();
          String version = context.get("VERSION");
          return Flux.create(
              (Consumer<FluxSink<TimeSeriesRowModel>>)
                  sink -> {
                    try {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



backup-core/src/main/java/org/apache/iotdb/backup/core/pipeline/in/source/InCsvDataSource.java [75:142]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
  private ConcurrentHashMap<String, List<InputStream>> COMPRESS_MAP = new ConcurrentHashMap<>();

  private Integer[] totalSize = new Integer[1];

  private int parallelism;

  @Override
  public Function<Flux<String>, Flux<TimeSeriesRowModel>> doExecute() {
    return flux ->
        flux.flatMap(
                s ->
                    Flux.deferContextual(
                        contextView -> {
                          PipelineContext<ImportModel> context = contextView.get("pipelineContext");
                          ImportModel importModel = context.getModel();
                          FilenameFilter filenameFilter = initFileFilter(importModel);
                          totalSize[0] =
                              importPipelineService.getFileArray(
                                      filenameFilter, importModel.getFileFolder())
                                  .length;
                          return importPipelineService.parseFluxFileName(
                              filenameFilter, COMPRESS_MAP);
                        }))
            .parallel(parallelism)
            .runOn(scheduler)
            .flatMap(this::parseTimeSeriesRowModel)
            .transform(doNext())
            .sequential()
            .doFinally(
                signalType -> {
                  for (String key : COMPRESS_MAP.keySet()) {
                    COMPRESS_MAP
                        .get(key)
                        .forEach(
                            inputStream -> {
                              if (inputStream != null) {
                                try {
                                  inputStream.close();
                                } catch (IOException e) {
                                }
                              }
                            });
                  }
                  scheduler.dispose();
                })
            .contextWrite(
                context -> {
                  return context.put("totalSize", totalSize);
                });
  }

  /**
   * 读取csv文件，把数据转化为TimeSeriesRowModel流 fileHeaderMap timeseries和csv中所在position的map tsDataTypeMap
   * timeseries和TSDataType的map 根据timeseries获取数据对应的类型以及其对应的值 record.get(fileHeaderMap.get(header))
   *
   * @param in
   * @return
   */
  public Flux<TimeSeriesRowModel> parseTimeSeriesRowModel(InputStream in) {
    return Flux.deferContextual(
        context -> {
          PipelineContext<ImportModel> pcontext = context.get("pipelineContext");
          ImportModel importModel = pcontext.getModel();
          String version = context.get("VERSION");
          return Flux.create(
              (Consumer<FluxSink<TimeSeriesRowModel>>)
                  sink -> {
                    try {
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



