in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [87:107]
override def preStart(): Unit = {
super.preStart()
log.info("Starting")
sourceActor = getStageActor {
case (_, Status.Failure(e)) =>
failStage(e)
case (_, Terminated(ref)) if ref == consumerActor =>
failStage(new ConsumerFailed)
case (_, msg) =>
log.warning("ignoring message [{}]", msg)
}
consumerActor = {
val extendedActorSystem = materializer.system.asInstanceOf[ExtendedActorSystem]
extendedActorSystem.systemActorOf(pekko.kafka.KafkaConsumerActor.props(sourceActor.ref, settings),
s"kafka-consumer-$actorNumber")
}
consumerPromise.success(consumerActor)
sourceActor.watch(consumerActor)
configureSubscription(partitionAssignedCB, partitionRevokedCB)
}