override def performShutdown()

in core/src/main/scala/org/apache/pekko/kafka/internal/SubSourceLogic.scala [268:291]


  override def performShutdown(): Unit = {
    log.info("Completing. Partitions [{}], StageActor {}", subSources.keys.mkString(","), sourceActor.ref)
    setKeepGoing(true)
    // todo we should wait for subsources to be shutdown and next shutdown main stage
    subSources.values.foreach {
      _.control.shutdown()
    }
    if (!isClosed(shape.out)) {
      complete(shape.out)
    }
    sourceActor.become {
      case (_, Terminated(ref)) if ref == consumerActor =>
        onShutdown()
        completeStage()
      case (_, msg) =>
        log.warning("ignoring message [{}]", msg)
    }
    materializer.scheduleOnce(
      settings.stopTimeout,
      new Runnable {
        override def run(): Unit =
          consumerActor.tell(KafkaConsumerActor.Internal.StopFromStage(id), sourceActor.ref)
      })
  }