in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [376:415]
private List<String> parseLoadData(List<List<Object>> rows, String[] dfColumns) throws StreamLoadException, JsonProcessingException {
List<String> loadDataList;
switch (fileType.toUpperCase()) {
case "CSV":
loadDataList = Collections.singletonList(rows.stream().map(row -> row.stream().map(field -> field == null ? NULL_VALUE : field.toString()).collect(Collectors.joining(FIELD_DELIMITER))).collect(Collectors.joining(LINE_DELIMITER)));
break;
case "JSON":
List<Map<Object, Object>> dataList = new ArrayList<>();
try {
for (List<Object> row : rows) {
Map<Object, Object> dataMap = new HashMap<>();
if (dfColumns.length == row.size()) {
for (int i = 0; i < dfColumns.length; i++) {
Object col = row.get(i);
if (col instanceof Timestamp) {
dataMap.put(dfColumns[i], col.toString());
continue;
}
dataMap.put(dfColumns[i], col);
}
}
dataList.add(dataMap);
}
} catch (Exception e) {
throw new StreamLoadException("The number of configured columns does not match the number of data columns.");
}
// splits large collections to normal collection to avoid the "Requested array size exceeds VM limit" exception
loadDataList = ListUtils.getSerializedList(dataList, readJsonByLine ? LINE_DELIMITER : null);
break;
default:
throw new StreamLoadException(String.format("Unsupported file format in stream load: %s.", fileType));
}
return loadDataList;
}