in core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala [183:200]
private def collectOffset(offset: Committable): Unit =
if (updateBatch(offset)) commit(BatchSize)
else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit(UpstreamClosed)
private def commit(triggeredBy: TriggerdBy): Unit = {
if (offsetBatch.batchSize != 0) {
log.debug("commit triggered by {} (awaitingProduceResult={} awaitingCommitResult={})",
triggeredBy,
awaitingProduceResult,
awaitingCommitResult)
val batchSize = offsetBatch.batchSize
offsetBatch
.commitInternal()
.onComplete(t => commitResultCB.invoke(batchSize -> t))(materializer.executionContext)
offsetBatch = CommittableOffsetBatch.empty
}
scheduleCommit()
}