in backend/src/main/java/org/apache/iotdb/admin/tool/ImportCsv.java [117:245]
private ImportDataVO loadDataFromCSV(File file, Session session) throws BaseException {
log.info("Start import data from file:" + file.getName());
String errorFileName = "error" + System.currentTimeMillis() + ".txt";
File errorFile = new File(this.fileStorageLocation.resolve(errorFileName).toString());
try (BufferedReader br =
new BufferedReader(
new InputStreamReader(new FileInputStream(file), StandardCharsets.UTF_8));
BufferedWriter bw = new BufferedWriter(new FileWriter(errorFile))) {
String header = br.readLine();
String[] cols = splitCsvLine(header);
if (cols.length <= 1) {
throw new BaseException(
ErrorCode.FILE_FIRST_LINE_ILLEGAL, ErrorCode.FILE_FIRST_LINE_ILLEGAL_MSG);
}
for (int i = 1; i < cols.length; i++) {
if (!cols[i].startsWith("root.") || StringUtils.countMatches(cols[i], ".") < 2) {
throw new BaseException(
ErrorCode.FILE_FIRST_LINE_ILLEGAL, ErrorCode.FILE_FIRST_LINE_ILLEGAL_MSG);
}
}
List<String> devices = new ArrayList<>();
List<Long> times = new ArrayList<>();
List<List<String>> measurementsList = new ArrayList<>();
List<List<String>> valuesList = new ArrayList<>();
Map<String, List<Integer>> devicesToPositions = new HashMap<>();
Map<String, List<String>> devicesToMeasurements = new HashMap<>();
for (int i = 1; i < cols.length; i++) {
splitColToDeviceAndMeasurement(cols[i], devicesToPositions, devicesToMeasurements, i);
}
SimpleDateFormat timeFormatter = null;
boolean useFormatter = false;
int lineNumber = 0;
List<String> insertErrorInfo = new ArrayList<>();
String line;
while ((line = br.readLine()) != null) {
cols = splitCsvLine(line);
lineNumber++;
if (lineNumber == 1) {
timeFormatter = formatterInit(cols[0]);
useFormatter = (timeFormatter != null);
}
for (Map.Entry<String, List<Integer>> deviceToPositions : devicesToPositions.entrySet()) {
List<String> values = new ArrayList<>();
for (int position : deviceToPositions.getValue()) {
values.add(cols[position]);
}
boolean isAllBlank = true;
for (String value : values) {
if (value != null && !"".equals(value)) {
isAllBlank = false;
break;
}
}
if (isAllBlank) {
continue;
}
valuesList.add(values);
String device = deviceToPositions.getKey();
devices.add(device);
times.add(parseTime(cols[0], useFormatter, timeFormatter));
measurementsList.add(devicesToMeasurements.get(device));
}
if (lineNumber % 10000 == 0) {
try {
session.insertRecords(devices, times, measurementsList, valuesList);
} catch (StatementExecutionException e) {
if (e.getMessage().contains("failed to insert measurements")) {
insertErrorInfo.addAll(
Arrays.asList(
StringUtils.splitByWholeSeparator(
e.getMessage(),
"org.apache.iotdb.db.exception.StorageEngineException: ")));
} else {
throw new BaseException(
ErrorCode.IMPORT_CSV_FAIL, ErrorCode.IMPORT_CSV_FAIL_MSG + e.getMessage());
}
} catch (IoTDBConnectionException e) {
throw new BaseException(ErrorCode.GET_SESSION_FAIL, ErrorCode.GET_SESSION_FAIL_MSG);
}
devices = new ArrayList<>();
times = new ArrayList<>();
measurementsList = new ArrayList<>();
valuesList = new ArrayList<>();
}
}
try {
if (lineNumber % 10000 != 0) {
session.insertRecords(devices, times, measurementsList, valuesList);
}
} catch (StatementExecutionException e) {
if (e.getMessage().contains("failed to insert measurements")) {
insertErrorInfo.addAll(
Arrays.asList(
StringUtils.splitByWholeSeparator(
e.getMessage(), "org.apache.iotdb.db.exception.StorageEngineException: ")));
} else {
throw new BaseException(
ErrorCode.IMPORT_CSV_FAIL, ErrorCode.IMPORT_CSV_FAIL_MSG + e.getMessage());
}
} catch (IoTDBConnectionException e) {
throw new BaseException(ErrorCode.GET_SESSION_FAIL, ErrorCode.GET_SESSION_FAIL_MSG);
}
for (String s : insertErrorInfo) {
bw.write(s + "\n");
}
int errorCount = insertErrorInfo.size();
String fileDownloadUri = null;
if (errorCount > 0) {
fileDownloadUri = "/downloadFile/" + errorFileName;
}
Integer totalCount = lineNumber * (cols.length - 1);
return new ImportDataVO(totalCount, errorCount, fileDownloadUri);
} catch (FileNotFoundException e) {
throw new BaseException(ErrorCode.UPLOAD_FILE_FAIL, ErrorCode.UPLOAD_FILE_FAIL_MSG);
} catch (IOException e) {
throw new BaseException(ErrorCode.FILE_IO_FAIL, ErrorCode.FILE_IO_FAIL_MSG + e.getMessage());
} catch (ArrayIndexOutOfBoundsException e) {
throw new BaseException(ErrorCode.FILE_FORMAT_ILLEGAL, ErrorCode.FILE_FORMAT_ILLEGAL_MSG);
}
}