def getOrCreateKylinSession()

in src/spark-project/sparder/src/main/scala/org/apache/spark/sql/KylinSession.scala [126:211]


    def getOrCreateKylinSession(): SparkSession = synchronized {
      val options =
        getValue("options", builder)
          .asInstanceOf[scala.collection.mutable.HashMap[String, String]]
      val userSuppliedContext: Option[SparkContext] =
        getValue("userSuppliedContext", builder)
          .asInstanceOf[Option[SparkContext]]
      val extensions = getValue("extensions", builder)
        .asInstanceOf[SparkSessionExtensions]
      var (session, existingSharedState, parentSessionState) = SparkSession.getActiveSession match {
        case Some(sparkSession: KylinSession) =>
          if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
            options.foreach {
              case (k, v) => sparkSession.sessionState.conf.setConfString(k, v)
            }
            (sparkSession, None, None)
          } else if ((sparkSession ne null) && sparkSession.sparkContext.isStopped) {
            (null, Some(sparkSession.sharedState), Some(sparkSession.sessionState))
          } else {
            (null, None, None)
          }
        case _ => (null, None, None)
      }
      if (session ne null) {
        return session
      }

      // for testing only
      // discard existing shardState directly
      // use in case that external shard state needs to be set in UT
      if (Option(System.getProperty("kylin.spark.discard-shard-state")).getOrElse("false").toBoolean) {
        existingSharedState = None
        parentSessionState = None
      }

      // Global synchronization so we will only set the default session once.
      SparkSession.synchronized {
        // If the current thread does not have an active session, get it from the global session.
        session = SparkSession.getDefaultSession match {
          case Some(sparkSession: KylinSession) =>
            if ((sparkSession ne null) && !sparkSession.sparkContext.isStopped) {
              sparkSession
            } else {
              null
            }
          case _ => null
        }
        if (session ne null) {
          return session
        }
        val sparkContext = userSuppliedContext.getOrElse {
          // set app name if not given
          val conf = new SparkConf()
          options.foreach { case (k, v) => conf.set(k, v) }
          val sparkConf: SparkConf = if (queryCluster) {
            initSparkConf(conf)
          } else {
            conf
          }
          initLogicalViewConfig(conf)
          val sc = SparkContext.getOrCreate(sparkConf)
          // maybe this is an existing SparkContext, update its SparkConf which maybe used
          // by SparkSession

          if (sc.master.startsWith("yarn")) {
            Unsafe.setProperty("spark.ui.proxyBase", "/proxy/" + sc.applicationId)
          }

          sc
        }
        applyExtensions(
          sparkContext.getConf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS).getOrElse(Seq.empty),
          extensions)
        session = new KylinSession(sparkContext, existingSharedState, parentSessionState, extensions)
        SQLConf.setSQLConfGetter(() => session.sessionState.conf)
        SparkSession.setDefaultSession(session)
        SparkSession.setActiveSession(session)
        sparkContext.addSparkListener(new SparkListener {
          override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
            SparkSession.setDefaultSession(null)
          }
        })
        UdfManager.create(session)
        session
      }
    }