in kyuubi-server/src/main/scala/org/apache/kyuubi/engine/EngineRef.scala [205:337]
private def create(
discoveryClient: DiscoveryClient,
extraEngineLog: Option[OperationLog]): (String, Int) = tryWithLock(discoveryClient) {
// Get the engine address ahead if another process has succeeded
var engineRef = discoveryClient.getServerHost(engineSpace)
if (engineRef.nonEmpty) return engineRef.get
conf.set(HA_NAMESPACE, engineSpace)
conf.set(HA_ENGINE_REF_ID, engineRefId)
val started = System.currentTimeMillis()
conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case FLINK_SQL =>
conf.setIfMissing(FlinkProcessBuilder.APP_KEY, defaultEngineName)
new FlinkProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case TRINO =>
new TrinoProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
case HIVE_SQL =>
conf.setIfMissing(HiveProcessBuilder.HIVE_ENGINE_NAME, defaultEngineName)
HiveProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
extraEngineLog,
defaultEngineName)
case JDBC =>
conf.setIfMissing(JdbcProcessBuilder.JDBC_ENGINE_NAME, defaultEngineName)
JdbcProcessBuilder(
appUser,
doAsEnabled,
conf,
engineRefId,
extraEngineLog,
defaultEngineName)
case CHAT =>
new ChatProcessBuilder(appUser, doAsEnabled, conf, engineRefId, extraEngineLog)
}
MetricsSystem.tracing(_.incCount(ENGINE_TOTAL))
var acquiredPermit = false
try {
if (!startupProcessSemaphore.forall(_.tryAcquire(timeout, TimeUnit.MILLISECONDS))) {
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" +
s" acquires a permit from engine builder semaphore.")
}
acquiredPermit = true
val redactedCmd = builder.toString
info(s"Launching engine:\n$redactedCmd")
builder.validateConf()
val process = builder.start
var exitValue: Option[Int] = None
var lastApplicationInfo: Option[ApplicationInfo] = None
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
exitValue = Some(process.exitValue())
if (exitValue.contains(0)) {
acquiredPermit = false
startupProcessSemaphore.foreach(_.release())
} else {
val error = builder.getError
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incCount(MetricRegistry.name(ENGINE_FAIL, error.getClass.getSimpleName))
}
throw error
}
}
if (started + timeout <= System.currentTimeMillis()) {
val killMessage =
engineManager.killApplication(builder.appMgrInfo(), engineRefId, Some(appUser))
builder.close(true)
MetricsSystem.tracing(_.incCount(MetricRegistry.name(ENGINE_TIMEOUT, appUser)))
throw KyuubiSQLException(
s"Timeout($timeout ms, you can modify ${ENGINE_INIT_TIMEOUT.key} to change it) to" +
s" launched $engineType engine with $redactedCmd. $killMessage",
builder.getError)
}
engineRef = discoveryClient.getEngineByRefId(engineSpace, engineRefId)
// even the submit process succeeds, the application might meet failure when initializing,
// check the engine application state from engine manager and fast fail on engine terminate
if (engineRef.isEmpty && exitValue.contains(0)) {
Option(engineManager).foreach { engineMgr =>
if (lastApplicationInfo.isDefined) {
TimeUnit.SECONDS.sleep(1)
}
val applicationInfo = engineMgr.getApplicationInfo(
builder.appMgrInfo(),
engineRefId,
Some(appUser),
Some(started))
applicationInfo.foreach { appInfo =>
if (ApplicationState.isTerminated(appInfo.state)) {
MetricsSystem.tracing { ms =>
ms.incCount(MetricRegistry.name(ENGINE_FAIL, appUser))
ms.incCount(MetricRegistry.name(ENGINE_FAIL, "ENGINE_TERMINATE"))
}
throw new KyuubiSQLException(
s"""
|The engine application has been terminated. Please check the engine log.
|ApplicationInfo: ${appInfo.toMap.mkString("(\n", ",\n", "\n)")}
|""".stripMargin,
builder.getError)
}
}
lastApplicationInfo = applicationInfo
}
}
}
engineRef.get
} finally {
if (acquiredPermit) startupProcessSemaphore.foreach(_.release())
val waitCompletion = conf.get(KyuubiConf.SESSION_ENGINE_STARTUP_WAIT_COMPLETION)
val destroyProcess = !waitCompletion && builder.isClusterMode()
if (destroyProcess) {
info("Destroy the builder process because waitCompletion is false" +
" and the engine is running in cluster mode.")
}
// we must close the process builder whether session open is success or failure since
// we have a log capture thread in process builder.
builder.close(destroyProcess)
}
}