in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/JdbcXaSinkFunction.java [290:309]
private void prepareCurrentTx(long checkpointId) throws IOException {
Preconditions.checkState(currentXid != null, "no current xid");
Preconditions.checkState(
!hangingXids.isEmpty() && hangingXids.peekLast().equals(currentXid),
"inconsistent internal state");
hangingXids.pollLast();
outputFormat.flush();
try {
xaFacade.endAndPrepare(currentXid);
preparedXids.add(CheckpointAndXid.createNew(checkpointId, currentXid));
} catch (EmptyXaTransactionException e) {
LOG.info(
"empty XA transaction (skip), xid: {}, checkpoint {}",
currentXid,
checkpointId);
} catch (Exception e) {
ExceptionUtils.rethrowIOException(e);
}
currentXid = null;
}