override def createEngine()

in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/scala/org/apache/linkis/manager/am/service/engine/DefaultEngineCreateService.scala [167:312]


  override def createEngine(
      engineCreateRequest: EngineCreateRequest,
      sender: Sender
  ): EngineNode = {
    val startTime = System.currentTimeMillis
    val taskId = JobUtils.getJobIdFromStringMap(engineCreateRequest.getProperties)
    logger.info(s"Task: $taskId start to create Engine for request: $engineCreateRequest.")

    val timeout =
      if (engineCreateRequest.getTimeout <= 0) {
        AMConfiguration.ENGINE_START_MAX_TIME.getValue.toLong
      } else engineCreateRequest.getTimeout

    // 1 build label
    val labelList = buildLabel(engineCreateRequest.getLabels, engineCreateRequest.getUser)

    // 2 select suite ecm
    val emNode = selectECM(engineCreateRequest, labelList)
    // 3. generate Resource
    if (engineCreateRequest.getProperties == null) {
      engineCreateRequest.setProperties(new util.HashMap[String, String]())
    }
    val resource =
      generateResource(
        engineCreateRequest.getProperties,
        engineCreateRequest.getUser,
        labelFilter.choseEngineLabel(labelList),
        timeout
      )
    // 4. request resource
    val resourceTicketId = resourceManager.requestResource(
      LabelUtils.distinctLabel(labelList, emNode.getLabels),
      resource,
      engineCreateRequest,
      timeout
    ) match {
      case AvailableResource(ticketId) =>
        ticketId
      case NotEnoughResource(reason) =>
        logger.warn(s"not enough resource: $reason")
        throw new LinkisRetryException(AMConstant.EM_ERROR_CODE, s"not enough resource: : $reason")
    }

    // 5. build engineConn request
    val engineBuildRequest = EngineConnBuildRequestImpl(
      resourceTicketId,
      labelFilter.choseEngineLabel(labelList),
      resource,
      EngineConnCreationDescImpl(
        engineCreateRequest.getCreateService,
        engineCreateRequest.getDescription,
        engineCreateRequest.getProperties
      )
    )

    // 6. Call ECM to send engine start request
    // AM will update the serviceInstance table
    // It is necessary to replace the ticketID and update the Label of EngineConn
    // It is necessary to modify the id in EngineInstanceLabel to Instance information
    val oldServiceInstance = new ServiceInstance
    oldServiceInstance.setApplicationName(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
    oldServiceInstance.setInstance(resourceTicketId)

    val engineNode = Utils.tryCatch(getEMService().createEngine(engineBuildRequest, emNode)) {
      case t: Throwable =>
        logger.info(s"Failed to create ec($resourceTicketId) ask ecm ${emNode.getServiceInstance}")
        val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance)
        if (null == failedEcNode) {
          logger.info(s" engineConn does not exist in db: $oldServiceInstance ")
        } else {
          failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance))
          failedEcNode.getLabels.addAll(
            LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), emNode.getLabels)
          )
          failedEcNode.setNodeStatus(NodeStatus.Failed)
          engineStopService.engineConnInfoClear(failedEcNode)
        }
        throw t
    }

    logger.info(
      s"Task: $taskId finished to create  engineConn $engineNode. ticketId is $resourceTicketId"
    )
    engineNode.setTicketId(resourceTicketId)

    // 7.Update persistent information: including inserting engine/metrics
    Utils.tryCatch(getEngineNodeManager.updateEngineNode(oldServiceInstance, engineNode)) { t =>
      logger.warn(s"Failed to update engineNode $engineNode", t)
      t match {
        case linkisRetryException: LinkisRetryException =>
          logger.warn(
            s"node $oldServiceInstance update failed,caused by retry Exception, do not to stop ec"
          )
        case _ =>
          val stopEngineRequest =
            new EngineStopRequest(engineNode.getServiceInstance, ManagerUtils.getAdminUser)
          engineStopService.asyncStopEngine(stopEngineRequest)
      }
      val failedEcNode = getEngineNodeManager.getEngineNode(oldServiceInstance)
      if (null == failedEcNode) {
        logger.info(s" engineConn does not exist in db: $oldServiceInstance ")
      } else {
        failedEcNode.setLabels(nodeLabelService.getNodeLabels(oldServiceInstance))
        failedEcNode.getLabels.addAll(
          LabelUtils.distinctLabel(labelFilter.choseEngineLabel(labelList), emNode.getLabels)
        )
        failedEcNode.setNodeStatus(NodeStatus.Failed)
        engineStopService.engineConnInfoClear(failedEcNode)
      }
      throw new LinkisRetryException(
        AMConstant.EM_ERROR_CODE,
        s"Failed to update engineNode: ${t.getMessage}"
      )
    }

    // 8. Add the Label of EngineConn, and add the Alias of engineConn
    val engineConnAliasLabel = labelBuilderFactory.createLabel(classOf[AliasServiceInstanceLabel])
    engineConnAliasLabel.setAlias(GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue)
    labelList.add(engineConnAliasLabel)
    nodeLabelService.addLabelsToNode(
      engineNode.getServiceInstance,
      labelFilter.choseEngineLabel(
        LabelUtils.distinctLabel(labelList, fromEMGetEngineLabels(emNode.getLabels))
      )
    )
    if (System.currentTimeMillis - startTime >= timeout && engineCreateRequest.isIgnoreTimeout) {
      logger.info(
        s"Return a EngineConn $engineNode for request: $engineCreateRequest since the creator set ignoreTimeout=true and maxStartTime is reached."
      )
      return engineNode
    }
    val leftWaitTime = timeout - (System.currentTimeMillis - startTime)
    if (ECAvailableRule.getInstance.isNeedAvailable(labelList)) {
      ensureECAvailable(engineNode, resourceTicketId, leftWaitTime)
      logger.info(
        s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode. time taken ${System
          .currentTimeMillis() - startTime}ms"
      )
    } else {
      logger.info(
        s"Task: $taskId finished to create Engine for request: $engineCreateRequest and get engineNode $engineNode.And did not judge the availability,time taken ${System
          .currentTimeMillis() - startTime}ms"
      )
    }
    engineNode
  }