in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalProducerStage.scala [169:183]
private def maybeCommitTransaction(beginNewTransaction: Boolean = true,
abortEmptyTransactionOnComplete: Boolean = false): Unit = {
val awaitingConf = awaitingConfirmationValue
batchOffsets match {
case batch: NonemptyTransactionBatch if awaitingConf == 0 =>
commitTransaction(batch, beginNewTransaction)
case _: EmptyTransactionBatch if awaitingConf == 0 && abortEmptyTransactionOnComplete =>
abortTransaction("Transaction is empty and stage is completing")
case _ if awaitingConf > 0 =>
suspendDemand()
scheduleOnce(commitSchedulerKey, messageDrainInterval)
case _ =>
scheduleOnce(commitSchedulerKey, producerSettings.eosCommitInterval)
}
}