in flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/xa/XaGroupOpsImpl.java [108:123]
public void recoverAndRollback(RuntimeContext runtimeContext, XidGenerator xidGenerator) {
Collection<Xid> recovered = xaFacade.recover();
if (recovered.isEmpty()) {
return;
}
LOG.warn("rollback {} recovered transactions", recovered.size());
for (Xid xid : recovered) {
if (xidGenerator.belongsToSubtask(xid, runtimeContext)) {
try {
xaFacade.rollback(xid);
} catch (Exception e) {
LOG.info("unable to rollback recovered transaction, xid={}", xid, e);
}
}
}
}