private ImportDataVO loadDataFromCSV()

in backend/src/main/java/org/apache/iotdb/admin/tool/ImportCsv.java [117:245]


  private ImportDataVO loadDataFromCSV(File file, Session session) throws BaseException {
    log.info("Start import data from file:" + file.getName());
    String errorFileName = "error" + System.currentTimeMillis() + ".txt";
    File errorFile = new File(this.fileStorageLocation.resolve(errorFileName).toString());

    try (BufferedReader br =
            new BufferedReader(
                new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
        BufferedWriter bw = new BufferedWriter(new FileWriter(errorFile))) {
      String header = br.readLine();
      String[] cols = splitCsvLine(header);
      if (cols.length <= 1) {
        throw new BaseException(
            ErrorCode.FILE_FIRST_LINE_ILLEGAL, ErrorCode.FILE_FIRST_LINE_ILLEGAL_MSG);
      }
      for (int i = 1; i < cols.length; i++) {
        if (!cols[i].startsWith("root.") || StringUtils.countMatches(cols[i], ".") < 2) {
          throw new BaseException(
              ErrorCode.FILE_FIRST_LINE_ILLEGAL, ErrorCode.FILE_FIRST_LINE_ILLEGAL_MSG);
        }
      }

      List<String> devices = new ArrayList<>();
      List<Long> times = new ArrayList<>();
      List<List<String>> measurementsList = new ArrayList<>();
      List<List<String>> valuesList = new ArrayList<>();
      Map<String, List<Integer>> devicesToPositions = new HashMap<>();
      Map<String, List<String>> devicesToMeasurements = new HashMap<>();

      for (int i = 1; i < cols.length; i++) {
        splitColToDeviceAndMeasurement(cols[i], devicesToPositions, devicesToMeasurements, i);
      }

      SimpleDateFormat timeFormatter = null;
      boolean useFormatter = false;

      int lineNumber = 0;
      List<String> insertErrorInfo = new ArrayList<>();
      String line;
      while ((line = br.readLine()) != null) {
        cols = splitCsvLine(line);
        lineNumber++;
        if (lineNumber == 1) {
          timeFormatter = formatterInit(cols[0]);
          useFormatter = (timeFormatter != null);
        }
        for (Map.Entry<String, List<Integer>> deviceToPositions : devicesToPositions.entrySet()) {
          List<String> values = new ArrayList<>();
          for (int position : deviceToPositions.getValue()) {
            values.add(cols[position]);
          }
          boolean isAllBlank = true;
          for (String value : values) {
            if (value != null && !"".equals(value)) {
              isAllBlank = false;
              break;
            }
          }
          if (isAllBlank) {
            continue;
          }
          valuesList.add(values);

          String device = deviceToPositions.getKey();
          devices.add(device);

          times.add(parseTime(cols[0], useFormatter, timeFormatter));

          measurementsList.add(devicesToMeasurements.get(device));
        }
        if (lineNumber % 10000 == 0) {
          try {
            session.insertRecords(devices, times, measurementsList, valuesList);
          } catch (StatementExecutionException e) {
            if (e.getMessage().contains("failed to insert measurements")) {
              insertErrorInfo.addAll(
                  Arrays.asList(
                      StringUtils.splitByWholeSeparator(
                          e.getMessage(),
                          "org.apache.iotdb.db.exception.StorageEngineException: ")));
            } else {
              throw new BaseException(
                  ErrorCode.IMPORT_CSV_FAIL, ErrorCode.IMPORT_CSV_FAIL_MSG + e.getMessage());
            }
          } catch (IoTDBConnectionException e) {
            throw new BaseException(ErrorCode.GET_SESSION_FAIL, ErrorCode.GET_SESSION_FAIL_MSG);
          }
          devices = new ArrayList<>();
          times = new ArrayList<>();
          measurementsList = new ArrayList<>();
          valuesList = new ArrayList<>();
        }
      }
      try {
        if (lineNumber % 10000 != 0) {
          session.insertRecords(devices, times, measurementsList, valuesList);
        }
      } catch (StatementExecutionException e) {
        if (e.getMessage().contains("failed to insert measurements")) {
          insertErrorInfo.addAll(
              Arrays.asList(
                  StringUtils.splitByWholeSeparator(
                      e.getMessage(), "org.apache.iotdb.db.exception.StorageEngineException: ")));
        } else {
          throw new BaseException(
              ErrorCode.IMPORT_CSV_FAIL, ErrorCode.IMPORT_CSV_FAIL_MSG + e.getMessage());
        }
      } catch (IoTDBConnectionException e) {
        throw new BaseException(ErrorCode.GET_SESSION_FAIL, ErrorCode.GET_SESSION_FAIL_MSG);
      }

      for (String s : insertErrorInfo) {
        bw.write(s + "\n");
      }
      int errorCount = insertErrorInfo.size();
      String fileDownloadUri = null;
      if (errorCount > 0) {
        fileDownloadUri = "/downloadFile/" + errorFileName;
      }
      Integer totalCount = lineNumber * (cols.length - 1);
      return new ImportDataVO(totalCount, errorCount, fileDownloadUri);
    } catch (FileNotFoundException e) {
      throw new BaseException(ErrorCode.UPLOAD_FILE_FAIL, ErrorCode.UPLOAD_FILE_FAIL_MSG);
    } catch (IOException e) {
      throw new BaseException(ErrorCode.FILE_IO_FAIL, ErrorCode.FILE_IO_FAIL_MSG + e.getMessage());
    } catch (ArrayIndexOutOfBoundsException e) {
      throw new BaseException(ErrorCode.FILE_FORMAT_ILLEGAL, ErrorCode.FILE_FORMAT_ILLEGAL_MSG);
    }
  }