private def scheduleCommit()

in core/src/main/scala/org/apache/pekko/kafka/internal/CommitCollectorStage.scala [70:89]


  private def scheduleCommit(): Unit =
    scheduleOnce(CommitNow, stage.committerSettings.maxInterval)

  override protected def onTimer(timerKey: Any): Unit = {
    var pushed = false
    timerKey match {
      case CommitCollectorStage.CommitNow =>
        if (activeBatchInProgress) {
          // Push only of the outlet is available, as timers may occur outside of a push/pull cycle.
          // Otherwise instruct `onPull` to emit what is there when the next pull occurs.
          // This is very hard to get tested consistently, so it gets this big comment instead.
          if (isAvailable(stage.out)) {
            pushDownStream(Interval)
            pushed = true
          } else pushOnNextPull = true
        } else scheduleCommit()
      case _ => log.warning("unexpected timer [{}]", timerKey)
    }
    if (!pushed) suspendContext()
  }