in src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala [230:393]
def initSparkConf(sparkConf: SparkConf): SparkConf = {
if (sparkConf.getBoolean("user.kylin.session", defaultValue = false)) {
return sparkConf
}
sparkConf.set("spark.amIpFilter.enabled", "false")
if (!KylinConfig.getInstanceFromEnv.getChannel.equalsIgnoreCase("cloud")) {
sparkConf.set("spark.executor.plugins", "org.apache.spark.memory.MonitorExecutorExtension")
}
// kerberos
if (kapConfig.isKerberosEnabled) {
sparkConf.set("spark.yarn.keytab", kapConfig.getKerberosKeytabPath)
sparkConf.set("spark.yarn.principal", kapConfig.getKerberosPrincipal)
sparkConf.set("spark.yarn.security.credentials.hive.enabled", "false")
}
if (UserGroupInformation.isSecurityEnabled) {
sparkConf.set("hive.metastore.sasl.enabled", "true")
}
kapConfig.getSparkConf.asScala.foreach {
case (k, v) =>
sparkConf.set(k, v)
}
// the length of the `podNamePrefix` needs to be less than or equal to 47
sparkConf.get(SPARK_MASTER) match {
case v if v.startsWith("k8s") =>
val appName = sparkConf.get("spark.app.name", System.getenv("HOSTNAME"))
val podNamePrefix = generateExecutorPodNamePrefixForK8s(appName)
logInfo(s"Sparder run on k8s, generated executorPodNamePrefix is $podNamePrefix")
sparkConf.setIfMissing("spark.kubernetes.executor.podNamePrefix", podNamePrefix)
val olapEngineNamespace = System.getenv("NAME_SPACE")
sparkConf.set("spark.kubernetes.executor.label.component", "sparder-driver-executor")
sparkConf.set("spark.kubernetes.executor.label.olap-engine-namespace", olapEngineNamespace)
if (sparkConf.get("spark.submit.deployMode", "").equals("cluster")) {
sparkConf.set("spark.kubernetes.driver.label.component", "sparder-driver-executor")
sparkConf.set("spark.kubernetes.driver.label.olap-engine-namespace", olapEngineNamespace)
}
case _ =>
}
val instances = sparkConf.get("spark.executor.instances").toInt
val cores = sparkConf.get("spark.executor.cores").toInt
val sparkCores = instances * cores
if (sparkConf.get("spark.sql.shuffle.partitions", "").isEmpty) {
sparkConf.set("spark.sql.shuffle.partitions", sparkCores.toString)
}
BigQueryThresholdUpdater.initBigQueryThresholdBySparkResource(instances, cores)
sparkConf.set("spark.debug.maxToStringFields", "1000")
sparkConf.set("spark.scheduler.mode", "FAIR")
val cartesianFactor = KylinConfig.getInstanceFromEnv.getCartesianPartitionNumThresholdFactor
var cartesianPartitionThreshold = sparkCores * cartesianFactor
val confThreshold = sparkConf.get("spark.sql.cartesianPartitionNumThreshold")
if (confThreshold.nonEmpty && confThreshold.toInt >= 0) {
cartesianPartitionThreshold = confThreshold.toInt
}
sparkConf.set("spark.sql.cartesianPartitionNumThreshold", cartesianPartitionThreshold.toString)
val fairSchedulerConfigDirPath = KylinConfig.getKylinConfDir.getCanonicalPath
applyFairSchedulerConfig(kapConfig, fairSchedulerConfigDirPath, sparkConf)
if (kapConfig.isQueryEscapedLiteral) {
sparkConf.set("spark.sql.parser.escapedStringLiterals", "true")
}
if (!"true".equalsIgnoreCase(System.getProperty("spark.local"))) {
if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
// TODO Less elegant implementation.
val applicationJar = KylinConfig.getInstanceFromEnv.getKylinJobJarPath
val yarnDistJarsConf = "spark.yarn.dist.jars"
val distJars = if (sparkConf.contains(yarnDistJarsConf)) {
s"${sparkConf.get(yarnDistJarsConf)},$applicationJar"
} else {
applicationJar
}
sparkConf.set(yarnDistJarsConf, distJars)
sparkConf.set(SPARK_YARN_DIST_FILE, kapConfig.sparderFiles())
} else {
sparkConf.set("spark.jars", kapConfig.sparderJars)
sparkConf.set("spark.files", kapConfig.sparderFiles())
}
// spark on k8s with client mode, set the spark.driver.host = local ip
if (sparkConf.get(SPARK_MASTER).startsWith("k8s")
&& "client".equals(sparkConf.get("spark.submit.deployMode", "client"))
&& !sparkConf.contains("spark.driver.host")) {
sparkConf.set("spark.driver.host", AddressUtil.getLocalHostExactAddress)
}
val krb5conf = " -Djava.security.krb5.conf=./__spark_conf__/__hadoop_conf__/krb5.conf"
val executorExtraJavaOptions =
sparkConf.get("spark.executor.extraJavaOptions", "")
var executorKerberosConf = ""
if (kapConfig.isKerberosEnabled && (kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.FI_PLATFORM)
|| kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.TDH_PLATFORM))) {
executorKerberosConf = krb5conf
}
sparkConf.set("spark.executor.extraJavaOptions",
s"$executorExtraJavaOptions -Duser.timezone=${kapConfig.getKylinConfig.getTimeZone} $executorKerberosConf")
val yarnAMJavaOptions =
sparkConf.get("spark.yarn.am.extraJavaOptions", "")
var amKerberosConf = ""
if (kapConfig.isKerberosEnabled && (kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.FI_PLATFORM)
|| kapConfig.getKerberosPlatform.equalsIgnoreCase(KapConfig.TDH_PLATFORM))) {
amKerberosConf = krb5conf
}
sparkConf.set("spark.yarn.am.extraJavaOptions",
s"$yarnAMJavaOptions $amKerberosConf")
}
var extraJars = Paths.get(KylinConfig.getInstanceFromEnv.getKylinJobJarPath).getFileName.toString
if (KylinConfig.getInstanceFromEnv.queryUseGlutenEnabled) {
if (sparkConf.get(SPARK_MASTER).startsWith("yarn")) {
val distFiles = sparkConf.get(SPARK_YARN_DIST_FILE)
if (distFiles.isEmpty) {
sparkConf.set(SPARK_YARN_DIST_FILE,
sparkConf.get(SPARK_EXECUTOR_JAR_PATH))
} else {
sparkConf.set(SPARK_YARN_DIST_FILE,
sparkConf.get(SPARK_EXECUTOR_JAR_PATH) + "," + distFiles)
}
extraJars = "gluten.jar" + File.pathSeparator + extraJars
} else {
extraJars = sparkConf.get(SPARK_EXECUTOR_JAR_PATH) +
File.pathSeparator + extraJars
}
}
sparkConf.set("spark.executor.extraClassPath", extraJars)
if (KylinConfig.getInstanceFromEnv.getQueryMemoryLimitDuringCollect > 0L) {
sparkConf.set("spark.sql.driver.maxMemoryUsageDuringCollect", KylinConfig.getInstanceFromEnv.getQueryMemoryLimitDuringCollect + "m")
}
val eventLogEnabled = sparkConf.getBoolean("spark.eventLog.enabled", defaultValue = false)
var logDir = sparkConf.get("spark.eventLog.dir", "")
if (eventLogEnabled && logDir.nonEmpty) {
logDir = ExtractFactory.create.getSparderEvenLogDir()
sparkConf.set("spark.eventLog.dir", logDir)
val logPath = new Path(new URI(logDir).getPath)
val fs = HadoopUtil.getWorkingFileSystem()
if (!fs.exists(logPath)) {
fs.mkdirs(logPath)
}
}
checkAndSetSparkPlugins(sparkConf)
if (KylinConfig.getInstanceFromEnv.isContainerSchedulerEnabled) {
ContainerInitializeListener.start()
val key = "spark.extraListeners";
val extraListeners = sparkConf.get(key, "")
if (extraListeners.isEmpty) {
sparkConf.set(key, "org.apache.spark.scheduler.ContainerInitializeListener")
} else {
sparkConf.set(key, "org.apache.spark.scheduler.ContainerInitializeListener," + extraListeners)
}
}
sparkConf.set("spark.cleaner.periodicGC.enabled", KylinConfig.getInstanceFromEnv.sparkPeriodicGCEnabled())
sparkConf
}