in src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala [126:211]
def getOrCreateKylinSession(): SparkSession = synchronized {
val options =
getValue("options", builder)
.asInstanceOf[scala.collection.mutable.HashMap[String, String]]
val userSuppliedContext: Option[SparkContext] =
getValue("userSuppliedContext", builder)
.asInstanceOf[Option[SparkContext]]
val extensions = getValue("extensions", builder)
.asInstanceOf[SparkSessionExtensions]
var (session, existingSharedState, parentSessionState) = SparkSession.getActiveSession match {
case Some(sparkSession: KylinSession) =>
if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
options.foreach {
case (k, v) => sparkSession.sessionState.conf.setConfString(k, v)
}
(sparkSession, None, None)
} else if ((sparkSession ne null) && sparkSession.sparkContext.isStopped) {
(null, Some(sparkSession.sharedState), Some(sparkSession.sessionState))
} else {
(null, None, None)
}
case _ => (null, None, None)
}
if (session ne null) {
return session
}
// for testing only
// discard existing shardState directly
// use in case that external shard state needs to be set in UT
if (Option(System.getProperty("kylin.spark.discard-shard-state")).getOrElse("false").toBoolean) {
existingSharedState = None
parentSessionState = None
}
// Global synchronization so we will only set the default session once.
SparkSession.synchronized {
// If the current thread does not have an active session, get it from the global session.
session = SparkSession.getDefaultSession match {
case Some(sparkSession: KylinSession) =>
if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
sparkSession
} else {
null
}
case _ => null
}
if (session ne null) {
return session
}
val sparkContext = userSuppliedContext.getOrElse {
// set app name if not given
val conf = new SparkConf()
options.foreach { case (k, v) => conf.set(k, v) }
val sparkConf: SparkConf = if (queryCluster) {
initSparkConf(conf)
} else {
conf
}
initLogicalViewConfig(conf)
val sc = SparkContext.getOrCreate(sparkConf)
// maybe this is an existing SparkContext, update its SparkConf which maybe used
// by SparkSession
if (sc.master.startsWith("yarn")) {
Unsafe.setProperty("spark.ui.proxyBase", "/proxy/" + sc.applicationId)
}
sc
}
applyExtensions(
sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
extensions)
session = new KylinSession(sparkContext, existingSharedState, parentSessionState, extensions)
SQLConf.setSQLConfGetter(() => session.sessionState.conf)
SparkSession.setDefaultSession(session)
SparkSession.setActiveSession(session)
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
SparkSession.setDefaultSession(null)
}
})
UdfManager.create(session)
session
}
}