def initSparkConf()

in src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala [230:393]


    def initSparkConf(sparkConf: SparkConf): SparkConf = {
      if (sparkConf.getBoolean("user.kylin.session", defaultValue = false)) {
        return sparkConf
      }
      sparkConf.set("spark.amIpFilter.enabled", "false")
      if (!KylinConfig.getInstanceFromEnv.getChannel.equalsIgnoreCase("cloud")) {
        sparkConf.set("spark.executor.plugins", "org.apache.spark.memory.MonitorExecutorExtension")
      }
      // kerberos
      if (kapConfig.isKerberosEnabled) {
        sparkConf.set("spark.yarn.keytab", kapConfig.getKerberosKeytabPath)
        sparkConf.set("spark.yarn.principal", kapConfig.getKerberosPrincipal)
        sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
      }

      if (UserGroupInformation.isSecurityEnabled) {
        sparkConf.set("hive.metastore.sasl.enabled", "true")
      }

      kapConfig.getSparkConf.asScala.foreach {
        case (k, v) =>
          sparkConf.set(k, v)
      }

      // the length of the `podNamePrefix` needs to be less than or equal to 47
      sparkConf.get(SPARK_MASTER) match {
        case v if v.startsWith("k8s") =>
          val appName = sparkConf.get("spark.app.name", System.getenv("HOSTNAME"))
          val podNamePrefix = generateExecutorPodNamePrefixForK8s(appName)
          logInfo(s"Sparder run on k8s, generated executorPodNamePrefix is $podNamePrefix")
          sparkConf.setIfMissing("spark.kubernetes.executor.podNamePrefix", podNamePrefix)
          val olapEngineNamespace = System.getenv("NAME_SPACE")
          sparkConf.set("spark.kubernetes.executor.label.component", "sparder-driver-executor")
          sparkConf.set("spark.kubernetes.executor.label.olap-engine-namespace", olapEngineNamespace)
          if (sparkConf.get("spark.submit.deployMode", "").equals("cluster")) {
            sparkConf.set("spark.kubernetes.driver.label.component", "sparder-driver-executor")
            sparkConf.set("spark.kubernetes.driver.label.olap-engine-namespace", olapEngineNamespace)
          }
        case _ =>
      }

      val instances = sparkConf.get("spark.executor.instances").toInt
      val cores = sparkConf.get("spark.executor.cores").toInt
      val sparkCores = instances * cores
      if (sparkConf.get("spark.sql.shuffle.partitions", "").isEmpty) {
        sparkConf.set("spark.sql.shuffle.partitions", sparkCores.toString)
      }

      BigQueryThresholdUpdater.initBigQueryThresholdBySparkResource(instances, cores)

      sparkConf.set("spark.debug.maxToStringFields", "1000")
      sparkConf.set("spark.scheduler.mode", "FAIR")
      val cartesianFactor = KylinConfig.getInstanceFromEnv.getCartesianPartitionNumThresholdFactor
      var cartesianPartitionThreshold = sparkCores * cartesianFactor
      val confThreshold = sparkConf.get("spark.sql.cartesianPartitionNumThreshold")
      if (confThreshold.nonEmpty && confThreshold.toInt >= 0) {
        cartesianPartitionThreshold = confThreshold.toInt
      }
      sparkConf.set("spark.sql.cartesianPartitionNumThreshold", cartesianPartitionThreshold.toString)

      val fairSchedulerConfigDirPath = KylinConfig.getKylinConfDir.getCanonicalPath
      applyFairSchedulerConfig(kapConfig, fairSchedulerConfigDirPath, sparkConf)

      if (kapConfig.isQueryEscapedLiteral) {
        sparkConf.set("spark.sql.parser.escapedStringLiterals", "true")
      }

      if (!"true".equalsIgnoreCase(System.getProperty("spark.local"))) {
        if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
          // TODO Less elegant implementation.
          val applicationJar = KylinConfig.getInstanceFromEnv.getKylinJobJarPath
          val yarnDistJarsConf = "spark.yarn.dist.jars"
          val distJars = if (sparkConf.contains(yarnDistJarsConf)) {
            s"${sparkConf.get(yarnDistJarsConf)},$applicationJar"
          } else {
            applicationJar
          }
          sparkConf.set(yarnDistJarsConf, distJars)
          sparkConf.set(SPARK_YARN_DIST_FILE, kapConfig.sparderFiles())
        } else {
          sparkConf.set("spark.jars", kapConfig.sparderJars)
          sparkConf.set("spark.files", kapConfig.sparderFiles())
        }

        // spark on k8s with client mode, set the spark.driver.host = local ip
        if (sparkConf.get(SPARK_MASTER).startsWith("k8s")
          && "client".equals(sparkConf.get("spark.submit.deployMode", "client"))
          && !sparkConf.contains("spark.driver.host")) {
          sparkConf.set("spark.driver.host", AddressUtil.getLocalHostExactAddress)
        }

        val krb5conf = " -Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/krb5.conf"
        val executorExtraJavaOptions =
          sparkConf.get("spark.executor.extraJavaOptions", "")
        var executorKerberosConf = ""
        if (kapConfig.isKerberosEnabled && (kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.FI_PLATFORM)
          || kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.TDH_PLATFORM))) {
          executorKerberosConf = krb5conf
        }
        sparkConf.set("spark.executor.extraJavaOptions",
          s"$executorExtraJavaOptions -Duser.timezone=${kapConfig.getKylinConfig.getTimeZone} $executorKerberosConf")

        val yarnAMJavaOptions =
          sparkConf.get("spark.yarn.am.extraJavaOptions", "")
        var amKerberosConf = ""
        if (kapConfig.isKerberosEnabled && (kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.FI_PLATFORM)
          || kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.TDH_PLATFORM))) {
          amKerberosConf = krb5conf
        }
        sparkConf.set("spark.yarn.am.extraJavaOptions",
          s"$yarnAMJavaOptions $amKerberosConf")
      }

      var extraJars = Paths.get(KylinConfig.getInstanceFromEnv.getKylinJobJarPath).getFileName.toString
      if (KylinConfig.getInstanceFromEnv.queryUseGlutenEnabled) {
        if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
          val distFiles = sparkConf.get(SPARK_YARN_DIST_FILE)
          if (distFiles.isEmpty) {
            sparkConf.set(SPARK_YARN_DIST_FILE,
              sparkConf.get(SPARK_EXECUTOR_JAR_PATH))
          } else {
            sparkConf.set(SPARK_YARN_DIST_FILE,
              sparkConf.get(SPARK_EXECUTOR_JAR_PATH) + "," + distFiles)
          }
          extraJars = "gluten.jar" + File.pathSeparator + extraJars
        } else {
          extraJars = sparkConf.get(SPARK_EXECUTOR_JAR_PATH) +
            File.pathSeparator + extraJars
        }
      }
      sparkConf.set("spark.executor.extraClassPath", extraJars)

      if (KylinConfig.getInstanceFromEnv.getQueryMemoryLimitDuringCollect > 0L) {
        sparkConf.set("spark.sql.driver.maxMemoryUsageDuringCollect", KylinConfig.getInstanceFromEnv.getQueryMemoryLimitDuringCollect + "m")
      }

      val eventLogEnabled = sparkConf.getBoolean("spark.eventLog.enabled", defaultValue = false)
      var logDir = sparkConf.get("spark.eventLog.dir", "")
      if (eventLogEnabled && logDir.nonEmpty) {
        logDir = ExtractFactory.create.getSparderEvenLogDir()
        sparkConf.set("spark.eventLog.dir", logDir)
        val logPath = new Path(new URI(logDir).getPath)
        val fs = HadoopUtil.getWorkingFileSystem()
        if (!fs.exists(logPath)) {
          fs.mkdirs(logPath)
        }
      }

      checkAndSetSparkPlugins(sparkConf)

      if (KylinConfig.getInstanceFromEnv.isContainerSchedulerEnabled) {
        ContainerInitializeListener.start()
        val key = "spark.extraListeners";
        val extraListeners = sparkConf.get(key, "")
        if (extraListeners.isEmpty) {
          sparkConf.set(key, "org.apache.spark.scheduler.ContainerInitializeListener")
        } else {
          sparkConf.set(key, "org.apache.spark.scheduler.ContainerInitializeListener," + extraListeners)
        }
      }

      sparkConf.set("spark.cleaner.periodicGC.enabled", KylinConfig.getInstanceFromEnv.sparkPeriodicGCEnabled())
      sparkConf
    }