private def create()

in kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala [205:337]


  private def create(
      discoveryClient: DiscoveryClient,
      extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(discoveryClient) {
    // Get the engine address ahead if another process has succeeded
    var engineRef = discoveryClient.getServerHost(engineSpace)
    if (engineRef.nonEmpty) return engineRef.get

    conf.set(HA_NAMESPACE, engineSpace)
    conf.set(HA_ENGINE_REF_ID, engineRefId)
    val started = System.currentTimeMillis()
    conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
    builder = engineType match {
      case SPARK_SQL =>
        conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
        new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
      case FLINK_SQL =>
        conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
        new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
      case TRINO =>
        new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
      case HIVE_SQL =>
        conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName)
        HiveProcessBuilder(
          appUser,
          doAsEnabled,
          conf,
          engineRefId,
          extraEngineLog,
          defaultEngineName)
      case JDBC =>
        conf.setIfMissing(JdbcProcessBuilder.JDBC_ENGINE_NAME, defaultEngineName)
        JdbcProcessBuilder(
          appUser,
          doAsEnabled,
          conf,
          engineRefId,
          extraEngineLog,
          defaultEngineName)
      case CHAT =>
        new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
    }

    MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
    var acquiredPermit = false
    try {
      if (!startupProcessSemaphore.forall(_.tryAcquire(timeout, TimeUnit.MILLISECONDS))) {
        MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
        throw KyuubiSQLException(
          s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" +
            s" acquires a permit from engine builder semaphore.")
      }
      acquiredPermit = true
      val redactedCmd = builder.toString
      info(s"Launching engine:\n$redactedCmd")
      builder.validateConf()
      val process = builder.start
      var exitValue: Option[Int] = None
      var lastApplicationInfo: Option[ApplicationInfo] = None
      while (engineRef.isEmpty) {
        if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
          exitValue = Some(process.exitValue())
          if (exitValue.contains(0)) {
            acquiredPermit = false
            startupProcessSemaphore.foreach(_.release())
          } else {
            val error = builder.getError
            MetricsSystem.tracing { ms =>
              ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
              ms.incCount(MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName))
            }
            throw error
          }
        }

        if (started + timeout <= System.currentTimeMillis()) {
          val killMessage =
            engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser))
          builder.close(true)
          MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
          throw KyuubiSQLException(
            s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" +
              s" launched $engineType engine with $redactedCmd. $killMessage",
            builder.getError)
        }
        engineRef = discoveryClient.getEngineByRefId(engineSpace, engineRefId)

        // even the submit process succeeds, the application might meet failure when initializing,
        // check the engine application state from engine manager and fast fail on engine terminate
        if (engineRef.isEmpty && exitValue.contains(0)) {
          Option(engineManager).foreach { engineMgr =>
            if (lastApplicationInfo.isDefined) {
              TimeUnit.SECONDS.sleep(1)
            }

            val applicationInfo = engineMgr.getApplicationInfo(
              builder.appMgrInfo(),
              engineRefId,
              Some(appUser),
              Some(started))

            applicationInfo.foreach { appInfo =>
              if (ApplicationState.isTerminated(appInfo.state)) {
                MetricsSystem.tracing { ms =>
                  ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
                  ms.incCount(MetricRegistry.name(ENGINE_FAIL, "ENGINE_TERMINATE"))
                }
                throw new KyuubiSQLException(
                  s"""
                     |The engine application has been terminated. Please check the engine log.
                     |ApplicationInfo: ${appInfo.toMap.mkString("(\n", ",\n", "\n)")}
                     |""".stripMargin,
                  builder.getError)
              }
            }

            lastApplicationInfo = applicationInfo
          }
        }
      }
      engineRef.get
    } finally {
      if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
      val waitCompletion = conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
      val destroyProcess = !waitCompletion && builder.isClusterMode()
      if (destroyProcess) {
        info("Destroy the builder process because waitCompletion is false" +
          " and the engine is running in cluster mode.")
      }
      // we must close the process builder whether session open is success or failure since
      // we have a log capture thread in process builder.
      builder.close(destroyProcess)
    }
  }