in core/src/main/scala/org/apache/pekko/kafka/internal/KafkaConsumerActor.scala [419:440]
private def applySettings(updatedSettings: ConsumerSettings[K, V]): Unit = {
this.settings = updatedSettings
if (settings.connectionCheckerSettings.enable)
context.actorOf(ConnectionChecker.props(settings.connectionCheckerSettings))
pollTimeout = settings.pollTimeout.asJava
offsetForTimesTimeout = settings.getOffsetForTimesTimeout
positionTimeout = settings.getPositionTimeout
val progressTrackingFactory: () => ConsumerProgressTracking = () => ensureProgressTracker()
commitRefreshing = CommitRefreshing(settings.commitRefreshInterval, progressTrackingFactory)
resetProtection = ConsumerResetProtection(log, settings.resetProtectionSettings, progressTrackingFactory)
try {
if (log.isDebugEnabled)
log.debug(s"Creating Kafka consumer with ${settings.toString}")
consumer = settings.consumerFactory.apply(settings)
context.become(regularReceive)
unstashAll()
} catch {
case e: Exception =>
owner.foreach(_ ! Failure(e))
throw e
}
}