override def preStart()

in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [87:107]


  override def preStart(): Unit = {
    super.preStart()
    log.info("Starting")
    sourceActor = getStageActor {
      case (_, Status.Failure(e)) =>
        failStage(e)
      case (_, Terminated(ref)) if ref == consumerActor =>
        failStage(new ConsumerFailed)
      case (_, msg) =>
        log.warning("ignoring message [{}]", msg)
    }
    consumerActor = {
      val extendedActorSystem = materializer.system.asInstanceOf[ExtendedActorSystem]
      extendedActorSystem.systemActorOf(pekko.kafka.KafkaConsumerActor.props(sourceActor.ref, settings),
        s"kafka-consumer-$actorNumber")
    }
    consumerPromise.success(consumerActor)
    sourceActor.watch(consumerActor)

    configureSubscription(partitionAssignedCB, partitionRevokedCB)
  }