in linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala [207:327]
def toExecuteTask(
engineConnTask: EngineConnTask,
internalExecute: Boolean = false
): ExecuteResponse = {
runningTasks.increase()
this.internalExecute = internalExecute
Utils.tryFinally {
transformTaskStatus(engineConnTask, ExecutionNodeStatus.Running)
val engineExecutionContext = createEngineExecutionContext(engineConnTask)
val engineCreationContext = EngineConnObject.getEngineCreationContext
var hookedCode = engineConnTask.getCode
Utils.tryCatch {
ComputationExecutorHook.getComputationExecutorHooks.foreach(hook => {
hookedCode =
hook.beforeExecutorExecute(engineExecutionContext, engineCreationContext, hookedCode)
})
} { e =>
e match {
case hookExecuteException: HookExecuteException =>
failedTasks.increase()
logger.error("failed to do with hook", e)
return ErrorExecuteResponse("hook execute failed task will be failed", e)
case _ => logger.info("failed to do with hook", e)
}
}
if (hookedCode.length > 100) {
logger.info(s"hooked after code: ${hookedCode.substring(0, 100)} ....")
} else {
logger.info(s"hooked after code: $hookedCode ")
}
// task params log
// spark engine: at org.apache.linkis.engineplugin.spark.executor.SparkEngineConnExecutor.executeLine log special conf
Utils.tryAndWarn {
val engineType = LabelUtil.getEngineType(engineCreationContext.getLabels())
EngineType.mapStringToEngineType(engineType) match {
case EngineType.HIVE | EngineType.TRINO => printTaskParamsLog(engineExecutionContext)
case _ =>
}
}
val localPath = EngineConnConf.getLogDir
engineExecutionContext.appendStdout(
LogUtils.generateInfo(
s"EngineConn local log path: ${DataWorkCloudApplication.getServiceInstance.toString} $localPath"
)
)
var response: ExecuteResponse = null
val incomplete = new StringBuilder
val codes =
Utils.tryCatch(getCodeParser.map(_.parse(hookedCode)).getOrElse(Array(hookedCode))) { e =>
logger.info("Your code will be submitted in overall mode.", e)
Array(hookedCode)
}
engineExecutionContext.setTotalParagraph(codes.length)
codes.indices.foreach({ index =>
if (ExecutionNodeStatus.Cancelled == engineConnTask.getStatus) {
return ErrorExecuteResponse("Job is killed by user!", null)
}
val code = codes(index)
engineExecutionContext.setCurrentParagraph(index + 1)
response = Utils.tryCatch(if (incomplete.nonEmpty) {
executeCompletely(engineExecutionContext, code, incomplete.toString())
} else executeLine(engineExecutionContext, code)) { t =>
ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(t), t)
}
incomplete ++= code
response match {
case e: ErrorExecuteResponse =>
failedTasks.increase()
logger.error("execute code failed!", e.t)
return response
case SuccessExecuteResponse() =>
engineExecutionContext.appendStdout("\n")
incomplete.setLength(0)
case e: OutputExecuteResponse =>
incomplete.setLength(0)
val output =
if (StringUtils.isNotEmpty(e.getOutput) && e.getOutput.length > outputPrintLimit) {
e.getOutput.substring(0, outputPrintLimit)
} else e.getOutput
engineExecutionContext.appendStdout(output)
if (StringUtils.isNotBlank(e.getOutput)) engineExecutionContext.sendResultSet(e)
case _: IncompleteExecuteResponse =>
incomplete ++= incompleteSplitter
}
})
Utils.tryCatch(engineExecutionContext.close()) { t =>
response = ErrorExecuteResponse("send resultSet to entrance failed!", t)
failedTasks.increase()
}
if (null == response && codes.isEmpty) {
logger.warn("This code is empty, the task will be directly marked as successful")
response = SuccessExecuteResponse()
}
response = response match {
case _: OutputExecuteResponse =>
succeedTasks.increase()
SuccessExecuteResponse()
case s: SuccessExecuteResponse =>
succeedTasks.increase()
s
case incompleteExecuteResponse: IncompleteExecuteResponse =>
ErrorExecuteResponse(
s"The task cannot be an incomplete response ${incompleteExecuteResponse.message}",
null
)
case _ => response
}
response
} {
runningTasks.decrease()
this.internalExecute = false
}
}