in flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java [59:122]
public void commit(Collection<CommitRequest<KafkaCommittable>> requests)
throws IOException, InterruptedException {
for (CommitRequest<KafkaCommittable> request : requests) {
final KafkaCommittable committable = request.getCommittable();
final String transactionalId = committable.getTransactionalId();
LOG.debug("Committing Kafka transaction {}", transactionalId);
Optional<Recyclable<? extends FlinkKafkaInternalProducer<?, ?>>> recyclable =
committable.getProducer();
FlinkKafkaInternalProducer<?, ?> producer;
try {
producer =
recyclable
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
.orElseGet(() -> getRecoveryProducer(committable));
producer.commitTransaction();
producer.flush();
recyclable.ifPresent(Recyclable::close);
} catch (RetriableException e) {
LOG.warn(
"Encountered retriable exception while committing {}.", transactionalId, e);
request.retryLater();
} catch (ProducerFencedException e) {
// initTransaction has been called on this transaction before
LOG.error(
"Unable to commit transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' (this is"
+ " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
+ " please consult the Flink documentation for more details.",
request,
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
KafkaSink.class.getSimpleName(),
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (InvalidTxnStateException e) {
// This exception only occurs when aborting after a commit or vice versa.
// It does not appear on double commits or double aborts.
LOG.error(
"Unable to commit transaction ({}) because it's in an invalid state. "
+ "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
request,
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (UnknownProducerIdException e) {
LOG.error(
"Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
request,
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithKnownReason(e);
} catch (Exception e) {
LOG.error(
"Transaction ({}) encountered error and data has been potentially lost.",
request,
e);
recyclable.ifPresent(Recyclable::close);
request.signalFailedWithUnknownReason(e);
}
}
}