def start()

in server/src/main/scala/org/apache/livy/server/LivyServer.scala [67:349]


  def start(): Unit = {
    livyConf = new LivyConf().loadFromFile("livy.conf")
    accessManager = new AccessManager(livyConf)

    val host = livyConf.get(SERVER_HOST)
    val port = livyConf.getInt(SERVER_PORT)
    val basePath = livyConf.get(SERVER_BASE_PATH)
    val multipartConfig = MultipartConfig(
        maxFileSize = Some(livyConf.getLong(LivyConf.FILE_UPLOAD_MAX_SIZE))
      ).toMultipartConfigElement

    // Make sure the `spark-submit` program exists, otherwise much of livy won't work.
    testSparkHome(livyConf)

    // Test spark-submit and get Spark Scala version accordingly.
    val (sparkVersion, scalaVersionFromSparkSubmit) = sparkSubmitVersion(livyConf)
    testSparkVersion(sparkVersion)

    // If Spark and Scala version is set manually, should verify if they're consistent with
    // ones parsed from "spark-submit --version"
    val formattedSparkVersion = formatSparkVersion(sparkVersion)
    Option(livyConf.get(LIVY_SPARK_VERSION)).map(formatSparkVersion).foreach { version =>
      require(formattedSparkVersion == version,
        s"Configured Spark version $version is not equal to Spark version $formattedSparkVersion " +
          "got from spark-submit -version")
    }

    // Set formatted Spark and Scala version into livy configuration, this will be used by
    // session creation.
    // TODO Create a new class to pass variables from LivyServer to sessions and remove these
    // internal LivyConfs.
    livyConf.set(LIVY_SPARK_VERSION.key, formattedSparkVersion.productIterator.mkString("."))
    livyConf.set(LIVY_SPARK_SCALA_VERSION.key,
      sparkScalaVersion(formattedSparkVersion, scalaVersionFromSparkSubmit, livyConf))

    if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
      _thriftServerFactory = Some(ThriftServerFactory.getInstance)
    }

    if (UserGroupInformation.isSecurityEnabled) {
      // If Hadoop security is enabled, run kinit periodically. runKinit() should be called
      // before any Hadoop operation, otherwise Kerberos exception will be thrown.
      executor = Executors.newScheduledThreadPool(1,
        new ThreadFactory() {
          override def newThread(r: Runnable): Thread = {
            val thread = new Thread(r)
            thread.setName("kinit-thread")
            thread.setDaemon(true)
            thread
          }
        }
      )
      val launch_keytab = livyConf.get(LAUNCH_KERBEROS_KEYTAB)
      val launch_principal = SecurityUtil.getServerPrincipal(
        livyConf.get(LAUNCH_KERBEROS_PRINCIPAL), host)
      require(launch_keytab != null,
        s"Kerberos requires ${LAUNCH_KERBEROS_KEYTAB.key} to be provided.")
      require(launch_principal != null,
        s"Kerberos requires ${LAUNCH_KERBEROS_PRINCIPAL.key} to be provided.")
      if (!runKinit(launch_keytab, launch_principal)) {
        error("Failed to run kinit, stopping the server.")
        sys.exit(1)
      }
      // This is and should be the only place where a login() on the UGI is performed.
      // If an other login in the codebase is strictly needed, a needLogin check should be added to
      // avoid anyway that 2 logins are performed.
      // This is needed because the thriftserver requires the UGI to be created from a keytab in
      // order to work properly and previously Livy was using a UGI generated from the cached TGT
      // (created by the kinit command).
      if (livyConf.getBoolean(LivyConf.THRIFT_SERVER_ENABLED)) {
        UserGroupInformation.loginUserFromKeytab(launch_principal, launch_keytab)
      }
      ugi = UserGroupInformation.getCurrentUser
      startKinitThread(launch_keytab, launch_principal)
    }

    testRecovery(livyConf)

    // Initialize YarnClient/KubernetesClient ASAP to save time.
    if (livyConf.isRunningOnYarn()) {
      SparkYarnApp.init(livyConf)
      Future { SparkYarnApp.yarnClient }
    } else if (livyConf.isRunningOnKubernetes()) {
      SparkKubernetesApp.init(livyConf)
    }

    if (livyConf.get(LivyConf.RECOVERY_STATE_STORE) == "zookeeper") {
      zkManager = Some(new ZooKeeperManager(livyConf))
      zkManager.foreach(_.start())
    }

    StateStore.init(livyConf, zkManager)
    val sessionStore = new SessionStore(livyConf)
    val batchSessionManager = new BatchSessionManager(livyConf, sessionStore)
    val interactiveSessionManager = new InteractiveSessionManager(livyConf, sessionStore)

    server = new WebServer(livyConf, host, port)
    server.context.setResourceBase("src/main/org/apache/livy/server")

    val livyVersionServlet = new JsonServlet {
      before() { contentType = "application/json" }

      get("/") {
        Map("version" -> LIVY_VERSION,
          "user" -> LIVY_BUILD_USER,
          "revision" -> LIVY_REVISION,
          "branch" -> LIVY_BRANCH,
          "date" -> LIVY_BUILD_DATE,
          "url" -> LIVY_REPO_URL)
      }
    }

    // Servlet for hosting static files such as html, css, and js
    // Necessary since Jetty cannot set it's resource base inside a jar
    // Returns 404 if the file does not exist
    val staticResourceServlet = new ScalatraServlet {
      get("/*") {
        val fileName = params("splat")
        val notFoundMsg = "File not found"

        if (!fileName.isEmpty) {
          getClass.getResourceAsStream(s"ui/static/$fileName") match {
            case is: InputStream => new BufferedInputStream(is)
            case null => NotFound(notFoundMsg)
          }
        } else {
          NotFound(notFoundMsg)
        }
      }
    }

    def uiRedirectServlet(path: String) = new ScalatraServlet {
      get("/") {
        redirect(path)
      }
    }

    server.context.addEventListener(
      new ServletContextListener() with MetricsBootstrap with ServletApiImplicits {

        private def mount(sc: ServletContext, servlet: Servlet, mappings: String*): Unit = {
          val registration = sc.addServlet(servlet.getClass().getName(), servlet)
          registration.addMapping(mappings: _*)
          registration.setMultipartConfig(multipartConfig)
        }

        override def contextDestroyed(sce: ServletContextEvent): Unit = {

        }

        override def contextInitialized(sce: ServletContextEvent): Unit = {
          try {
            val context = sce.getServletContext()
            context.initParameters(org.scalatra.EnvironmentKey) = livyConf.get(ENVIRONMENT)

            val interactiveServlet = new InteractiveSessionServlet(
              interactiveSessionManager, sessionStore, livyConf, accessManager)
            mount(context, interactiveServlet, "/sessions/*")

            val batchServlet =
              new BatchSessionServlet(batchSessionManager, sessionStore, livyConf, accessManager)
            mount(context, batchServlet, "/batches/*")

            if (livyConf.getBoolean(UI_ENABLED)) {
              val uiServlet = new UIServlet(basePath, livyConf)
              mount(context, uiServlet, "/ui/*")
              mount(context, staticResourceServlet, "/static/*")
              mount(context, uiRedirectServlet(basePath + "/ui/"), "/*")
              _thriftServerFactory.foreach { factory =>
                mount(context, factory.getServlet(basePath), factory.getServletMappings: _*)
              }
            } else {
              mount(context, uiRedirectServlet(basePath + "/metrics"), "/*")
            }

            context.mountMetricsAdminServlet("/metrics")

            mount(context, livyVersionServlet, "/version/*")
          } catch {
            case e: Throwable =>
              error("Exception thrown when initializing server", e)
              sys.exit(1)
          }
        }

      })

    if (livyConf.getBoolean(SECURITY_HEADERS_ENABLED)) {
      info("Adding security headers is enabled.")
      val securityHeadersHolder = new FilterHolder(new SecurityHeadersFilter(livyConf))
      server.context.addFilter(securityHeadersHolder, "/*", EnumSet.allOf(classOf[DispatcherType]))
    }

    livyConf.get(AUTH_TYPE) match {
      case authType @ KerberosAuthenticationHandler.TYPE =>
        val principal = SecurityUtil.getServerPrincipal(livyConf.get(AUTH_KERBEROS_PRINCIPAL),
          server.host)
        val keytab = livyConf.get(AUTH_KERBEROS_KEYTAB)
        require(principal != null,
          s"Kerberos auth requires ${AUTH_KERBEROS_PRINCIPAL.key} to be provided.")
        require(keytab != null,
          s"Kerberos auth requires ${AUTH_KERBEROS_KEYTAB.key} to be provided.")

        val holder = new FilterHolder(new AuthenticationFilter())
        holder.setInitParameter(AuthenticationFilter.AUTH_TYPE, authType)
        holder.setInitParameter(KerberosAuthenticationHandler.PRINCIPAL, principal)
        holder.setInitParameter(KerberosAuthenticationHandler.KEYTAB, keytab)
        holder.setInitParameter(KerberosAuthenticationHandler.NAME_RULES,
          livyConf.get(AUTH_KERBEROS_NAME_RULES))
        server.context.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
        info(s"SPNEGO auth enabled (principal = $principal)")

      case authType @ LdapAuthenticationHandlerImpl.TYPE =>
        val holder = new FilterHolder(new AuthenticationFilter())
        holder.setInitParameter(AuthenticationFilter.AUTH_TYPE,
          LdapAuthenticationHandlerImpl.getClass.getCanonicalName.dropRight(1))
        Option(livyConf.get(LivyConf.AUTH_LDAP_URL)).foreach { url =>
          holder.setInitParameter(LdapAuthenticationHandlerImpl.PROVIDER_URL, url)
        }
        Option(livyConf.get(LivyConf.AUTH_LDAP_USERNAME_DOMAIN)).foreach { domain =>
          holder.setInitParameter(LdapAuthenticationHandlerImpl.LDAP_BIND_DOMAIN, domain)
        }
        Option(livyConf.get(LivyConf.AUTH_LDAP_BASE_DN)).foreach { baseDN =>
          holder.setInitParameter(LdapAuthenticationHandlerImpl.BASE_DN, baseDN)
        }
        holder.setInitParameter(LdapAuthenticationHandlerImpl.SECURITY_AUTHENTICATION,
          livyConf.get(LivyConf.AUTH_LDAP_SECURITY_AUTH))
        holder.setInitParameter(LdapAuthenticationHandlerImpl.ENABLE_START_TLS,
          livyConf.get(LivyConf.AUTH_LDAP_ENABLE_START_TLS))
        server.context.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
        info("LDAP auth enabled.")

      case null =>
        // Nothing to do.

      case customType =>
        val authClassConf = s"livy.server.auth.$customType.class"
        val authClass = livyConf.get(authClassConf)
        require(authClass != null, s"$customType auth requires $authClassConf to be provided")

        val holder = new FilterHolder()
        holder.setClassName(authClass)

        val prefix = s"livy.server.auth.$customType.param."
        livyConf.asScala.filter { kv =>
          kv.getKey.length > prefix.length && kv.getKey.startsWith(prefix)
        }.foreach { kv =>
          holder.setInitParameter(kv.getKey.substring(prefix.length), kv.getValue)
        }
        server.context.addFilter(holder, "/*", EnumSet.allOf(classOf[DispatcherType]))
        info(s"$customType auth enabled")
    }

    if (livyConf.getBoolean(CSRF_PROTECTION)) {
      info("CSRF protection is enabled.")
      val csrfHolder = new FilterHolder(new CsrfFilter())
      server.context.addFilter(csrfHolder, "/*", EnumSet.allOf(classOf[DispatcherType]))
    }

    if (accessManager.isAccessControlOn) {
      info("Access control is enabled")
      val accessHolder = new FilterHolder(new AccessFilter(accessManager))
      server.context.addFilter(accessHolder, "/*", EnumSet.allOf(classOf[DispatcherType]))
    }

    server.start()

    _thriftServerFactory.foreach {
      _.start(livyConf, interactiveSessionManager, sessionStore, accessManager)
    }

    Runtime.getRuntime().addShutdownHook(new Thread("Livy Server Shutdown") {
      override def run(): Unit = {
        info("Shutting down Livy server.")
        zkManager.foreach(_.stop())
        server.stop()
        _thriftServerFactory.foreach(_.stop())
      }
    })

    _serverUrl = Some(s"${server.protocol}://${server.host}:${server.port}")
    sys.props("livy.server.server-url") = _serverUrl.get
  }