in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala [121:142]
private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
case (sender, Committed(offsets)) =>
inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
sender.tell(Done, sourceActor.ref)
case (sender, CommittingFailure) => {
log.info("Committing failed, resetting in flight offsets")
inFlightRecords.reset()
}
case (sender, Drain(partitions, ack, msg)) =>
if (inFlightRecords.empty(partitions)) {
log.debug(s"Partitions drained ${partitions.mkString(",")}")
ack.getOrElse(sender).tell(msg, sourceActor.ref)
} else {
log.debug(s"Draining partitions {}", partitions)
materializer.scheduleOnce(
consumerSettings.drainingCheckInterval,
new Runnable {
override def run(): Unit =
sourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), sourceActor.ref)
})
}
}