private def drainHandling: PartialFunction[()

in core/src/main/scala/org/apache/pekko/kafka/internal/TransactionalSources.scala [121:142]


  private def drainHandling: PartialFunction[(ActorRef, Any), Unit] = {
    case (sender, Committed(offsets)) =>
      inFlightRecords.committed(offsets.view.mapValues(_.offset() - 1).toMap)
      sender.tell(Done, sourceActor.ref)
    case (sender, CommittingFailure) => {
      log.info("Committing failed, resetting in flight offsets")
      inFlightRecords.reset()
    }
    case (sender, Drain(partitions, ack, msg)) =>
      if (inFlightRecords.empty(partitions)) {
        log.debug(s"Partitions drained ${partitions.mkString(",")}")
        ack.getOrElse(sender).tell(msg, sourceActor.ref)
      } else {
        log.debug(s"Draining partitions {}", partitions)
        materializer.scheduleOnce(
          consumerSettings.drainingCheckInterval,
          new Runnable {
            override def run(): Unit =
              sourceActor.ref.tell(Drain(partitions, ack.orElse(Some(sender)), msg), sourceActor.ref)
          })
      }
  }