in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineAskEngineService.scala [91:275]
override def askEngine(engineAskRequest: EngineAskRequest, sender: Sender): Any = {
val taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties)
LoggerUtils.setJobIdMDC(taskId)
logger.info(s"received task: $taskId, engineAskRequest $engineAskRequest")
if (hooksArray != null && hooksArray.size > 0) {
val ctx = new AskEngineConnHookContext(engineAskRequest, sender)
/** Throwing exceptions in hook will block(hook中抛异常会阻断) */
hooksArray.foreach(h =>
Utils.tryCatch(h.doHook(ctx)) { t =>
{
val engineAskAsyncId = getAsyncId
val retryFlag = t match {
case _: LinkisRetryException => true
case _: RetryableException => true
case _ =>
ExceptionUtils.getRootCause(t) match {
case _: SocketTimeoutException => true
case _: TimeoutException => true
case _ =>
false
}
}
return EngineCreateError(
engineAskAsyncId,
ExceptionUtils.getRootCauseMessage(t),
retryFlag
)
}
}
)
}
val engineAskAsyncId = getAsyncId
val createNodeThread = Future {
LoggerUtils.setJobIdMDC(taskId)
val (engineCreateKey, semaphore) =
Utils.tryAndWarn(getKeyAndSemaphore(engineAskRequest.getLabels))
Utils.tryFinally {
if (null != semaphore) {
try {
semaphore.acquire()
logger.info(s"$engineCreateKey succeed to get lock")
} catch {
case e: Exception =>
logger.warn(
s"Task: $taskId user ${engineAskRequest.getUser} acquire semaphore failed",
e
)
}
}
var reuseNode: EngineNode = null
if (!engineAskRequest.getLabels.containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
val engineReuseRequest = new EngineReuseRequest()
engineReuseRequest.setLabels(engineAskRequest.getLabels)
engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut)
engineReuseRequest.setUser(engineAskRequest.getUser)
engineReuseRequest.setProperties(engineAskRequest.getProperties)
reuseNode = Utils.tryCatch(engineReuseService.reuseEngine(engineReuseRequest, sender)) {
t: Throwable =>
t match {
case retryException: LinkisRetryException =>
logger.info(
s"Task: $taskId user ${engineAskRequest.getUser} reuse engine failed ${t.getMessage}"
)
case _ =>
logger.info(
s"Task: $taskId user ${engineAskRequest.getUser} reuse engine failed",
t
)
}
null
}
}
if (null != reuseNode) {
logger.info(
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by reuse node $reuseNode"
)
(reuseNode, true)
} else {
logger.info(
s"Task: $taskId start to async($engineAskAsyncId) createEngine, ${engineAskRequest.getCreateService}"
)
// If the original labels contain engineInstance, remove it first (如果原来的labels含engineInstance ,先去掉)
engineAskRequest.getLabels.remove(LabelKeyConstant.ENGINE_INSTANCE_KEY)
// 添加引擎启动驱动任务id标签
val labels: util.Map[String, AnyRef] = engineAskRequest.getLabels
labels.put(LabelKeyConstant.DRIVER_TASK_KEY, taskId)
val engineCreateRequest = new EngineCreateRequest
engineCreateRequest.setLabels(engineAskRequest.getLabels)
engineCreateRequest.setTimeout(engineAskRequest.getTimeOut)
engineCreateRequest.setUser(engineAskRequest.getUser)
engineCreateRequest.setProperties(engineAskRequest.getProperties)
engineCreateRequest.setCreateService(engineAskRequest.getCreateService)
val createNode = engineCreateService.createEngine(engineCreateRequest, sender)
val timeout =
if (engineCreateRequest.getTimeout <= 0) {
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
} else engineCreateRequest.getTimeout
// UseEngine requires a timeout (useEngine 需要加上超时)
val createEngineNode = getEngineNodeManager.useEngine(createNode, timeout)
if (null == createEngineNode) {
throw new LinkisRetryException(
AMConstant.EM_ERROR_CODE,
s"create engine${createNode.getServiceInstance} success, but to use engine failed"
)
}
logger.info(
s"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode"
)
(createEngineNode, false)
}
} {
Utils.tryAndWarn {
if (null != semaphore) {
semaphore.release()
logger.info(s"$engineCreateKey succeed to relaese lock")
}
}
LoggerUtils.removeJobIdMDC()
}
}
createNodeThread.onComplete {
case Success((engineNode, isReuse)) =>
LoggerUtils.setJobIdMDC(taskId)
Utils.tryFinally {
if (isReuse) {
logger.info(
s"Task: $taskId Success to async($engineAskAsyncId) reuseEngine $engineNode"
)
} else {
logger.info(
s"Task: $taskId Success to async($engineAskAsyncId) createEngine $engineNode"
)
}
if (null != sender) {
sender.send(EngineCreateSuccess(engineAskAsyncId, engineNode, isReuse))
} else {
logger.info("Will not send async useing null sender.")
}
} {
LoggerUtils.removeJobIdMDC()
}
case Failure(exception) =>
LoggerUtils.setJobIdMDC(taskId)
val retryFlag = exception match {
case retryException: LinkisRetryException => true
case retryableException: RetryableException => true
case _ =>
ExceptionUtils.getRootCause(exception) match {
case socketTimeoutException: SocketTimeoutException => true
case timeoutException: TimeoutException => true
case _ =>
false
}
}
val msg =
s"Task: $taskId Failed to async($engineAskAsyncId) create/reuse Engine, can Retry $retryFlag";
if (!retryFlag) {
logger.info(msg, exception)
} else {
logger.info(s"msg: ${msg} canRetry Exception: ${exception.getClass.getName}")
}
Utils.tryFinally {
sender.send(
EngineCreateError(
engineAskAsyncId,
ExceptionUtils.getRootCauseMessage(exception),
retryFlag
)
)
} {
LoggerUtils.removeJobIdMDC()
}
}
LoggerUtils.removeJobIdMDC()
EngineAskAsyncResponse(engineAskAsyncId, Sender.getThisServiceInstance)
}