private def maybeCommitTransaction()

in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala [169:183]


  private def maybeCommitTransaction(beginNewTransaction: Boolean = true,
      abortEmptyTransactionOnComplete: Boolean = false): Unit = {
    val awaitingConf = awaitingConfirmationValue
    batchOffsets match {
      case batch: NonemptyTransactionBatch if awaitingConf == 0 =>
        commitTransaction(batch, beginNewTransaction)
      case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete =>
        abortTransaction("Transaction is empty and stage is completing")
      case _ if awaitingConf > 0 =>
        suspendDemand()
        scheduleOnce(commitSchedulerKey, messageDrainInterval)
      case _ =>
        scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval)
    }
  }