in spark-operator/src/main/java/org/apache/spark/k8s/operator/SparkOperator.java [167:190]
protected void overrideOperatorConfigs(ConfigurationServiceOverrider overrider) {
overrider.withKubernetesClient(client);
overrider.withStopOnInformerErrorDuringStartup(
SparkOperatorConf.TERMINATE_ON_INFORMER_FAILURE_ENABLED.getValue());
overrider.withTerminationTimeoutSeconds(
SparkOperatorConf.RECONCILER_TERMINATION_TIMEOUT_SECONDS.getValue());
int parallelism = SparkOperatorConf.RECONCILER_PARALLELISM.getValue();
if (parallelism > 0) {
log.info("Configuring operator with {} reconciliation threads.", parallelism);
overrider.withConcurrentReconciliationThreads(parallelism);
} else {
log.info("Configuring operator with unbounded reconciliation thread pool.");
overrider.withExecutorService(Executors.newCachedThreadPool());
}
if (SparkOperatorConf.LEADER_ELECTION_ENABLED.getValue()) {
overrider.withLeaderElectionConfiguration(SparkOperatorConf.getLeaderElectionConfig());
}
if (SparkOperatorConf.JOSDK_METRICS_ENABLED.getValue()) {
log.info("Adding Operator JosdkMetrics to metrics system.");
OperatorJosdkMetrics operatorJosdkMetrics = new OperatorJosdkMetrics();
overrider.withMetrics(operatorJosdkMetrics);
metricsSystem.registerSource(operatorJosdkMetrics);
}
}