protected def closeProducer()

in core/src/main/scala/org/apache/pekko/kafka/internal/DeferredProducer.scala [110:120]


  protected def closeProducer(): Unit =
    if (producerSettings.closeProducerOnStop && producerAssignmentLifecycle == Assigned) {
      try {
        // we do not have to check if producer was already closed in send-callback as `flush()` and `close()` are effectively no-ops in this case
        producer.flush()
        producer.close(producerSettings.closeTimeout.asJava)
        log.debug("Producer closed")
      } catch {
        case NonFatal(ex) => log.error(ex, "Problem occurred during producer close")
      }
    }