in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java [91:138]
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
throws IOException, InterruptedException {
for (CommitRequest<KafkaCommittable> request : requests) {
final KafkaCommittable committable = request.getCommittable();
final String transactionalId = committable.getTransactionalId();
LOG.debug("Committing Kafka transaction {}", transactionalId);
Optional<FlinkKafkaInternalProducer<?, ?>> writerProducer = committable.getProducer();
FlinkKafkaInternalProducer<?, ?> producer = null;
try {
producer = writerProducer.orElseGet(() -> getProducer(committable));
producer.commitTransaction();
backchannel.send(TransactionFinished.successful(committable.getTransactionalId()));
} catch (RetriableException e) {
LOG.warn(
"Encountered retriable exception while committing {}.", transactionalId, e);
request.retryLater();
} catch (ProducerFencedException e) {
logFencedRequest(request, e);
handleFailedTransaction(producer);
request.signalFailedWithKnownReason(e);
} catch (InvalidTxnStateException e) {
// This exception only occurs when aborting after a commit or vice versa.
// It does not appear on double commits or double aborts.
LOG.error(
"Unable to commit transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
request,
e);
handleFailedTransaction(producer);
request.signalFailedWithKnownReason(e);
} catch (UnknownProducerIdException e) {
LOG.error(
"Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
request,
e);
handleFailedTransaction(producer);
request.signalFailedWithKnownReason(e);
} catch (Exception e) {
LOG.error(
"Transaction ({}) encountered error and data has been potentially lost.",
request,
e);
closeCommitterProducer(producer);
// cause failover
request.signalFailedWithUnknownReason(e);
}
}
}