in core/src/main/scala/org/apache/pekko/kafka/internal/CommitCollectorStage.scala [70:89]
private def scheduleCommit(): Unit =
scheduleOnce(CommitNow, stage.committerSettings.maxInterval)
override protected def onTimer(timerKey: Any): Unit = {
var pushed = false
timerKey match {
case CommitCollectorStage.CommitNow =>
if (activeBatchInProgress) {
// Push only of the outlet is available, as timers may occur outside of a push/pull cycle.
// Otherwise instruct `onPull` to emit what is there when the next pull occurs.
// This is very hard to get tested consistently, so it gets this big comment instead.
if (isAvailable(stage.out)) {
pushDownStream(Interval)
pushed = true
} else pushOnNextPull = true
} else scheduleCommit()
case _ => log.warning("unexpected timer [{}]", timerKey)
}
if (!pushed) suspendContext()
}