in flink-connector-pulsar/src/main/java/org/apache/flink/connector/pulsar/sink/committer/PulsarCommitter.java [69:145]
public void commit(Collection<CommitRequest<PulsarCommittable>> requests)
throws PulsarClientException {
TransactionCoordinatorClient client = transactionCoordinatorClient();
for (CommitRequest<PulsarCommittable> request : requests) {
PulsarCommittable committable = request.getCommittable();
TxnID txnID = committable.getTxnID();
String topic = committable.getTopic();
LOG.debug("Start committing the Pulsar transaction {} for topic {}", txnID, topic);
try {
client.commit(txnID);
} catch (CoordinatorNotFoundException e) {
LOG.error(
"We couldn't find the Transaction Coordinator from Pulsar broker {}. "
+ "Check your broker configuration.",
committable,
e);
request.signalFailedWithKnownReason(e);
} catch (InvalidTxnStatusException e) {
LOG.error(
"Unable to commit transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. "
+ "Please check the Pulsar broker logs for more details.",
committable,
e);
request.signalAlreadyCommitted();
} catch (TransactionNotFoundException e) {
if (request.getNumberOfRetries() == 0) {
LOG.error(
"Unable to commit transaction ({}) because it's not found on Pulsar broker. "
+ "Most likely the checkpoint interval exceed the transaction timeout.",
committable,
e);
request.signalFailedWithKnownReason(e);
} else {
LOG.warn(
"We can't find the transaction {} after {} retry committing. "
+ "This may mean that the transaction have been committed in previous but failed with timeout. "
+ "So we just mark it as committed.",
txnID,
request.getNumberOfRetries());
request.signalAlreadyCommitted();
}
} catch (MetaStoreHandlerNotExistsException e) {
LOG.error(
"We can't find the meta store handler by the mostSigBits from TxnID {}. "
+ "Did you change the metadata for topic {}?",
committable,
TRANSACTION_COORDINATOR_ASSIGN,
e);
request.signalFailedWithKnownReason(e);
} catch (TransactionCoordinatorClientException e) {
LOG.error(
"Encountered retriable exception while committing transaction {} for topic {}.",
committable,
topic,
e);
int maxRecommitTimes = sinkConfiguration.getMaxRecommitTimes();
if (request.getNumberOfRetries() < maxRecommitTimes) {
request.retryLater();
} else {
String message =
String.format(
"Failed to commit transaction %s after retrying %d times",
txnID, maxRecommitTimes);
request.signalFailedWithKnownReason(new FlinkRuntimeException(message, e));
}
} catch (Exception e) {
LOG.error(
"Transaction ({}) encountered unknown error and data could be potentially lost.",
committable,
e);
request.signalFailedWithUnknownReason(e);
}
}
}