in core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala [110:120]
protected def closeProducer(): Unit =
if (producerSettings.closeProducerOnStop && producerAssignmentLifecycle == Assigned) {
try {
// we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
producer.flush()
producer.close(producerSettings.closeTimeout.asJava)
log.debug("Producer closed")
} catch {
case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
}
}