private def applySettings()

in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [419:440]


  private def applySettings(updatedSettings: ConsumerSettings[K, V]): Unit = {
    this.settings = updatedSettings
    if (settings.connectionCheckerSettings.enable)
      context.actorOf(ConnectionChecker.props(settings.connectionCheckerSettings))
    pollTimeout = settings.pollTimeout.asJava
    offsetForTimesTimeout = settings.getOffsetForTimesTimeout
    positionTimeout = settings.getPositionTimeout
    val progressTrackingFactory: () => ConsumerProgressTracking = () => ensureProgressTracker()
    commitRefreshing = CommitRefreshing(settings.commitRefreshInterval, progressTrackingFactory)
    resetProtection = ConsumerResetProtection(log, settings.resetProtectionSettings, progressTrackingFactory)
    try {
      if (log.isDebugEnabled)
        log.debug(s"Creating Kafka consumer with ${settings.toString}")
      consumer = settings.consumerFactory.apply(settings)
      context.become(regularReceive)
      unstashAll()
    } catch {
      case e: Exception =>
        owner.foreach(_ ! Failure(e))
        throw e
    }
  }