public void close()

in flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java [930:978]


    public void close() throws FlinkKafkaException {
        // First close the producer for current transaction.
        try {
            final KafkaTransactionState currentTransaction = currentTransaction();
            if (currentTransaction != null) {
                // to avoid exceptions on aborting transactions with some pending records
                flush(currentTransaction);

                // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of
                // producer reusing, thus
                // we need to close it manually
                switch (semantic) {
                    case EXACTLY_ONCE:
                        break;
                    case AT_LEAST_ONCE:
                    case NONE:
                        currentTransaction.producer.flush();
                        currentTransaction.producer.close(Duration.ofSeconds(0));
                        break;
                }
            }
            super.close();
        } catch (Exception e) {
            asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException);
        } finally {
            // We may have to close producer of the current transaction in case some exception was
            // thrown before
            // the normal close routine finishes.
            if (currentTransaction() != null) {
                try {
                    currentTransaction().producer.close(Duration.ofSeconds(0));
                } catch (Throwable t) {
                    LOG.warn("Error closing producer.", t);
                }
            }
            // Make sure all the producers for pending transactions are closed.
            pendingTransactions()
                    .forEach(
                            transaction -> {
                                try {
                                    transaction.getValue().producer.close(Duration.ofSeconds(0));
                                } catch (Throwable t) {
                                    LOG.warn("Error closing producer.", t);
                                }
                            });
            // make sure we propagate pending errors
            checkErroneous();
        }
    }