in src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java [112:139]
public Map<String, String> fetchLabel2Status() {
String querySQL =
String.format(
TRANSACTION_LABEL_PATTEN,
dorisOptions.getDatabase(),
labelGenerator.buildLabelPrefix());
LOG.info("query doris offset by sql: {}", querySQL);
Map<String, String> label2Status = new HashMap<>();
try (Connection connection = connectionProvider.getOrEstablishConnection();
PreparedStatement ps = connection.prepareStatement(querySQL);
ResultSet rs = ps.executeQuery()) {
while (rs.next()) {
String label = rs.getString("Label");
String transactionStatus = rs.getString("TransactionStatus");
label2Status.put(label, transactionStatus);
}
} catch (Exception e) {
LOG.warn(
"Unable to obtain the label generated when importing data through stream load from doris, "
+ "causing the doris kafka connector to not guarantee exactly once.",
e);
throw new StreamLoadException(
"Unable to obtain the label generated when importing data through stream load from doris, "
+ "causing the doris kafka connector to not guarantee exactly once.",
e);
}
return label2Status;
}