in core/src/main/scala/org/apache/pekko/kafka/internal/BaseSingleSourceLogic.scala [102:111]
private def pump(): Unit =
if (isAvailable(shape.out)) {
if (buffer.hasNext) {
val msg = buffer.next()
push(shape.out, createMessage(msg))
pump()
} else if (!requested && tps.nonEmpty) {
requestMessages()
}
}