in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [268:291]
override def performShutdown(): Unit = {
log.info("Completing. Partitions [{}], StageActor {}", subSources.keys.mkString(","), sourceActor.ref)
setKeepGoing(true)
// todo we should wait for subsources to be shutdown and next shutdown main stage
subSources.values.foreach {
_.control.shutdown()
}
if (!isClosed(shape.out)) {
complete(shape.out)
}
sourceActor.become {
case (_, Terminated(ref)) if ref == consumerActor =>
onShutdown()
completeStage()
case (_, msg) =>
log.warning("ignoring message [{}]", msg)
}
materializer.scheduleOnce(
settings.stopTimeout,
new Runnable {
override def run(): Unit =
consumerActor.tell(KafkaConsumerActor.Internal.StopFromStage(id), sourceActor.ref)
})
}