in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/writer/topic/ProducerRegister.java [309:324]
private void abortTransactions() {
if (coordinatorClient == null || transactions.isEmpty()) {
return;
}
try (Closer closer = Closer.create()) {
for (Transaction transaction : transactions.values()) {
TxnID txnID = transaction.getTxnID();
closer.register(() -> coordinatorClient.abort(txnID));
}
transactions.clear();
} catch (IOException e) {
throw new FlinkRuntimeException(e);
}
}