private def collectOffset()

in core/src/main/scala/org/apache/pekko/kafka/internal/CommittingProducerSinkStage.scala [183:200]


  private def collectOffset(offset: Committable): Unit =
    if (updateBatch(offset)) commit(BatchSize)
    else if (isClosed(stage.in) && awaitingProduceResult == 0L) commit(UpstreamClosed)

  private def commit(triggeredBy: TriggerdBy): Unit = {
    if (offsetBatch.batchSize != 0) {
      log.debug("commit triggered by {} (awaitingProduceResult={} awaitingCommitResult={})",
        triggeredBy,
        awaitingProduceResult,
        awaitingCommitResult)
      val batchSize = offsetBatch.batchSize
      offsetBatch
        .commitInternal()
        .onComplete(t => commitResultCB.invoke(batchSize -> t))(materializer.executionContext)
      offsetBatch = CommittableOffsetBatch.empty
    }
    scheduleCommit()
  }