in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala [167:312]
override def createEngine(
engineCreateRequest: EngineCreateRequest,
sender: Sender
): EngineNode = {
val startTime = System.currentTimeMillis
val taskId = JobUtils.getJobIdFromStringMap(engineCreateRequest.getProperties)
logger.info(s"Task: $taskId start to create Engine for request: $engineCreateRequest.")
val timeout =
if (engineCreateRequest.getTimeout <= 0) {
AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
} else engineCreateRequest.getTimeout
// 1 build label
val labelList = buildLabel(engineCreateRequest.getLabels, engineCreateRequest.getUser)
// 2 select suite ecm
val emNode = selectECM(engineCreateRequest, labelList)
// 3. generate Resource
if (engineCreateRequest.getProperties == null) {
engineCreateRequest.setProperties(new util.HashMap[String, String]())
}
val resource =
generateResource(
engineCreateRequest.getProperties,
engineCreateRequest.getUser,
labelFilter.choseEngineLabel(labelList),
timeout
)
// 4. request resource
val resourceTicketId = resourceManager.requestResource(
LabelUtils.distinctLabel(labelList, emNode.getLabels),
resource,
engineCreateRequest,
timeout
) match {
case AvailableResource(ticketId) =>
ticketId
case NotEnoughResource(reason) =>
logger.warn(s"not enough resource: $reason")
throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"not enough resource: : $reason")
}
// 5. build engineConn request
val engineBuildRequest = EngineConnBuildRequestImpl(
resourceTicketId,
labelFilter.choseEngineLabel(labelList),
resource,
EngineConnCreationDescImpl(
engineCreateRequest.getCreateService,
engineCreateRequest.getDescription,
engineCreateRequest.getProperties
)
)
// 6. Call ECM to send engine start request
// AM will update the serviceInstance table
// It is necessary to replace the ticketID and update the Label of EngineConn
// It is necessary to modify the id in EngineInstanceLabel to Instance information
val oldServiceInstance = new ServiceInstance
oldServiceInstance.setApplicationName(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
oldServiceInstance.setInstance(resourceTicketId)
val engineNode = Utils.tryCatch(getEMService().createEngine(engineBuildRequest, emNode)) {
case t: Throwable =>
logger.info(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}")
val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance)
if (null == failedEcNode) {
logger.info(s" engineConn does not exist in db: $oldServiceInstance ")
} else {
failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance))
failedEcNode.getLabels.addAll(
LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), emNode.getLabels)
)
failedEcNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(failedEcNode)
}
throw t
}
logger.info(
s"Task: $taskId finished to create engineConn $engineNode. ticketId is $resourceTicketId"
)
engineNode.setTicketId(resourceTicketId)
// 7.Update persistent information: including inserting engine/metrics
Utils.tryCatch(getEngineNodeManager.updateEngineNode(oldServiceInstance, engineNode)) { t =>
logger.warn(s"Failed to update engineNode $engineNode", t)
t match {
case linkisRetryException: LinkisRetryException =>
logger.warn(
s"node $oldServiceInstance update failed,caused by retry Exception, do not to stop ec"
)
case _ =>
val stopEngineRequest =
new EngineStopRequest(engineNode.getServiceInstance, ManagerUtils.getAdminUser)
engineStopService.asyncStopEngine(stopEngineRequest)
}
val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance)
if (null == failedEcNode) {
logger.info(s" engineConn does not exist in db: $oldServiceInstance ")
} else {
failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance))
failedEcNode.getLabels.addAll(
LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), emNode.getLabels)
)
failedEcNode.setNodeStatus(NodeStatus.Failed)
engineStopService.engineConnInfoClear(failedEcNode)
}
throw new LinkisRetryException(
AMConstant.EM_ERROR_CODE,
s"Failed to update engineNode: ${t.getMessage}"
)
}
// 8. Add the Label of EngineConn, and add the Alias of engineConn
val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
labelList.add(engineConnAliasLabel)
nodeLabelService.addLabelsToNode(
engineNode.getServiceInstance,
labelFilter.choseEngineLabel(
LabelUtils.distinctLabel(labelList, fromEMGetEngineLabels(emNode.getLabels))
)
)
if (System.currentTimeMillis - startTime >= timeout && engineCreateRequest.isIgnoreTimeout) {
logger.info(
s"Return a EngineConn $engineNode for request: $engineCreateRequest since the creator set ignoreTimeout=true and maxStartTime is reached."
)
return engineNode
}
val leftWaitTime = timeout - (System.currentTimeMillis - startTime)
if (ECAvailableRule.getInstance.isNeedAvailable(labelList)) {
ensureECAvailable(engineNode, resourceTicketId, leftWaitTime)
logger.info(
s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System
.currentTimeMillis() - startTime}ms"
)
} else {
logger.info(
s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System
.currentTimeMillis() - startTime}ms"
)
}
engineNode
}