in linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/restful/EngineRestfulApi.java [120:266]
public Message askEngineConn(
HttpServletRequest req, @RequestBody EngineAskRequest engineAskRequest)
throws IOException, InterruptedException {
String userName = ModuleUserUtils.getOperationUser(req, "askEngineConn");
engineAskRequest.setUser(userName);
long timeout = engineAskRequest.getTimeOut();
if (timeout <= 0) {
timeout = AMConfiguration.ENGINE_CONN_START_REST_MAX_WAIT_TIME.getValue().toLong();
engineAskRequest.setTimeOut(timeout);
}
Map<String, Object> retEngineNode = new HashMap<>();
logger.info(
"User {} try to ask an engineConn with maxStartTime {}. EngineAskRequest is {}.",
userName,
ByteTimeUtils.msDurationToString(timeout),
engineAskRequest);
Sender sender = Sender.getSender(Sender.getThisServiceInstance());
EngineNode engineNode = null;
// try to reuse ec first
String taskId = JobUtils.getJobIdFromStringMap(engineAskRequest.getProperties());
LoggerUtils.setJobIdMDC(taskId);
logger.info("received task : {}, engineAskRequest : {}", taskId, engineAskRequest);
if (!engineAskRequest.getLabels().containsKey(LabelKeyConstant.EXECUTE_ONCE_KEY)) {
EngineReuseRequest engineReuseRequest = new EngineReuseRequest();
engineReuseRequest.setLabels(engineAskRequest.getLabels());
engineReuseRequest.setTimeOut(engineAskRequest.getTimeOut());
engineReuseRequest.setUser(engineAskRequest.getUser());
engineReuseRequest.setProperties(engineAskRequest.getProperties());
boolean end = false;
EngineNode reuseNode = null;
int count = 0;
int MAX_RETRY = 2;
while (!end) {
try {
reuseNode = engineReuseService.reuseEngine(engineReuseRequest, sender);
end = true;
} catch (LinkisRetryException e) {
logger.error(
"task: {}, user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e);
Thread.sleep(1000);
end = false;
count += 1;
if (count > MAX_RETRY) {
end = true;
}
} catch (Exception e1) {
logger.info(
"task: {} user: {} reuse engine failed", taskId, engineReuseRequest.getUser(), e1);
end = true;
}
}
if (null != reuseNode) {
logger.info(
"Finished to ask engine for task: {}, user: {} by reuse node {}",
taskId,
engineReuseRequest.getUser(),
reuseNode);
LoggerUtils.removeJobIdMDC();
engineNode = reuseNode;
}
}
if (null != engineNode) {
fillResultEngineNode(retEngineNode, engineNode);
return Message.ok("reuse engineConn ended.").data("engine", retEngineNode);
}
String engineAskAsyncId = EngineAskEngineService$.MODULE$.getAsyncId();
Callable<Object> createECTask =
new Callable() {
@Override
public Object call() {
LoggerUtils.setJobIdMDC(taskId);
logger.info(
"Task: {}, start to async({}) createEngine: {}",
taskId,
engineAskAsyncId,
engineAskRequest.getCreateService());
// remove engineInstance label if exists
engineAskRequest.getLabels().remove("engineInstance");
EngineCreateRequest engineCreateRequest = new EngineCreateRequest();
engineCreateRequest.setLabels(engineAskRequest.getLabels());
engineCreateRequest.setTimeout(engineAskRequest.getTimeOut());
engineCreateRequest.setUser(engineAskRequest.getUser());
engineCreateRequest.setProperties(engineAskRequest.getProperties());
engineCreateRequest.setCreateService(engineAskRequest.getCreateService());
try {
EngineNode createNode = engineCreateService.createEngine(engineCreateRequest, sender);
long timeout = 0L;
if (engineCreateRequest.getTimeout() <= 0) {
timeout = AMConfiguration.ENGINE_START_MAX_TIME.getValue().toLong();
} else {
timeout = engineCreateRequest.getTimeout();
}
// useEngine need to add timeout
EngineNode createEngineNode = engineNodeManager.useEngine(createNode, timeout);
if (null == createEngineNode) {
throw new LinkisRetryException(
AMConstant.EM_ERROR_CODE,
"create engine${createNode.getServiceInstance} success, but to use engine failed");
}
logger.info(
"Task: $taskId finished to ask engine for user ${engineAskRequest.getUser} by create node $createEngineNode");
return createEngineNode;
} catch (Exception e) {
logger.error(
"Task: {} failed to ask engine for user {} by create node", taskId, userName, e);
return new LinkisRetryException(AMConstant.EM_ERROR_CODE, e.getMessage());
} finally {
LoggerUtils.removeJobIdMDC();
}
}
};
try {
Object rs = createECTask.call();
if (rs instanceof LinkisRetryException) {
throw (LinkisRetryException) rs;
} else {
engineNode = (EngineNode) rs;
}
} catch (LinkisRetryException retryException) {
logger.error(
"User {} create engineConn failed get retry exception. can be Retry",
userName,
retryException);
return Message.error(
String.format(
"Create engineConn failed, caused by %s.",
ExceptionUtils.getRootCauseMessage(retryException)))
.data("canRetry", true);
} catch (Exception e) {
LoggerUtils.removeJobIdMDC();
logger.error("User {} create engineConn failed get retry exception", userName, e);
return Message.error(
String.format(
"Create engineConn failed, caused by %s.", ExceptionUtils.getRootCauseMessage(e)));
}
LoggerUtils.removeJobIdMDC();
fillResultEngineNode(retEngineNode, engineNode);
logger.info(
"Finished to create a engineConn for user {}. NodeInfo is {}.", userName, engineNode);
// to transform to a map
return Message.ok("create engineConn ended.").data("engine", retEngineNode);
}