in spark-doris-connector/src/main/java/org/apache/doris/spark/load/DorisStreamLoad.java [175:196]
public List<Integer> loadV2(List<List<Object>> rows, String[] dfColumns, Boolean enable2PC) throws StreamLoadException, JsonProcessingException {
List<String> loadData = parseLoadData(rows, dfColumns);
List<Integer> txnIds = new ArrayList<>(loadData.size());
try {
for (String data : loadData) {
txnIds.add(load(data, enable2PC));
}
} catch (StreamLoadException e) {
if (enable2PC && !txnIds.isEmpty()) {
LOG.error("load batch failed, abort previously pre-committed transactions");
for (Integer txnId : txnIds) {
abort(txnId);
}
}
throw e;
}
return txnIds;
}