in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [930:978]
public void close() throws FlinkKafkaException {
// First close the producer for current transaction.
try {
final KafkaTransactionState currentTransaction = currentTransaction();
if (currentTransaction != null) {
// to avoid exceptions on aborting transactions with some pending records
flush(currentTransaction);
// normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of
// producer reusing, thus
// we need to close it manually
switch (semantic) {
case EXACTLY_ONCE:
break;
case AT_LEAST_ONCE:
case NONE:
currentTransaction.producer.flush();
currentTransaction.producer.close(Duration.ofSeconds(0));
break;
}
}
super.close();
} catch (Exception e) {
asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
} finally {
// We may have to close producer of the current transaction in case some exception was
// thrown before
// the normal close routine finishes.
if (currentTransaction() != null) {
try {
currentTransaction().producer.close(Duration.ofSeconds(0));
} catch (Throwable t) {
LOG.warn("Error closing producer.", t);
}
}
// Make sure all the producers for pending transactions are closed.
pendingTransactions()
.forEach(
transaction -> {
try {
transaction.getValue().producer.close(Duration.ofSeconds(0));
} catch (Throwable t) {
LOG.warn("Error closing producer.", t);
}
});
// make sure we propagate pending errors
checkErroneous();
}
}