in linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala [92:245]
override def executeLine(
engineExecutionContext: EngineExecutionContext,
code: String
): ExecuteResponse = {
if (null != engineExecutionContext) {
this.engineExecutionContext = engineExecutionContext
logger.info("Shell executor reset new engineExecutionContext!")
}
if (engineExecutionContext.getJobId.isEmpty) {
return ErrorExecuteResponse("taskID is null", null)
}
val taskId = engineExecutionContext.getJobId.get
var bufferedReader: BufferedReader = null
var errorsReader: BufferedReader = null
val completed = new AtomicBoolean(false)
var errReaderThread: ReaderThread = null
var inputReaderThread: ReaderThread = null
try {
engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")
val argsArr =
if (
engineExecutionContext.getTotalParagraph == 1 &&
engineExecutionContext.getProperties != null &&
engineExecutionContext.getProperties.containsKey(
ShellEngineConnPluginConst.RUNTIME_ARGS_KEY
)
) {
Utils.tryCatch {
val argsList = engineExecutionContext.getProperties
.get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
.asInstanceOf[util.ArrayList[String]]
logger.info(
"Will execute shell task with user-specified arguments: \'" + argsList
.toArray(new Array[String](argsList.size()))
.mkString("\' \'") + "\'"
)
argsList.toArray(new Array[String](argsList.size()))
} { t =>
logger.warn(
"Cannot read user-input shell arguments. Will execute shell task without them.",
t
)
null
}
} else {
null
}
val workingDirectory =
if (
engineExecutionContext.getTotalParagraph == 1 &&
engineExecutionContext.getProperties != null &&
engineExecutionContext.getProperties.containsKey(
ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
)
) {
Utils.tryCatch {
val wdStr = engineExecutionContext.getProperties
.get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)
.asInstanceOf[String]
if (isExecutePathExist(wdStr)) {
logger.info(
"Will execute shell task under user-specified working-directory: \'" + wdStr
)
wdStr
} else {
logger.warn(
"User-specified working-directory: \'" + wdStr + "\' does not exist or user does not have access permission. " +
"Will execute shell task under default working-directory. Please contact the administrator!"
)
null
}
} { t =>
logger.warn(
"Cannot read user-input working-directory. Will execute shell task under default working-directory.",
t
)
null
}
} else {
null
}
val generatedCode = if (argsArr == null || argsArr.isEmpty) {
generateRunCode(code)
} else {
generateRunCodeWithArgs(code, argsArr)
}
val processBuilder: ProcessBuilder = new ProcessBuilder(generatedCode: _*)
if (StringUtils.isNotBlank(workingDirectory)) {
processBuilder.directory(new File(workingDirectory))
}
val env = processBuilder.environment()
val jobTags = JobUtils.getJobSourceTagsFromObjectMap(engineExecutionContext.getProperties)
val jobId = JobUtils.getJobIdFromMap(engineExecutionContext.getProperties)
if (StringUtils.isNotBlank(jobId)) {
logger.info(s"set env job id ${jobId}.")
env.put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
}
if (StringUtils.isAsciiPrintable(jobTags)) {
env.put(ECConstants.HIVE_OPTS, s" --hiveconf mapreduce.job.tags=$jobTags")
env.put(ECConstants.SPARK_SUBMIT_OPTS, s" -Dspark.yarn.tags=$jobTags")
}
processBuilder.redirectErrorStream(false)
val extractor = new YarnAppIdExtractor
val process = processBuilder.start()
bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
// add task id and task Info cache
shellECTaskInfoCache.put(taskId, ShellECTaskInfo(taskId, process, extractor))
val counter: CountDownLatch = new CountDownLatch(2)
inputReaderThread =
new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
errReaderThread =
new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter)
logAsyncService.execute(inputReaderThread)
logAsyncService.execute(errReaderThread)
val exitCode = process.waitFor()
counter.await()
completed.set(true)
if (exitCode != 0) {
ErrorExecuteResponse(
s"run shell failed with error:\n ${errReaderThread.getOutString()}",
ShellCodeErrorException()
)
} else SuccessExecuteResponse()
} catch {
case e: Exception =>
logger.error("Execute shell code failed, reason:", e)
ErrorExecuteResponse("run shell failed", e)
} finally {
if (null != errorsReader) {
inputReaderThread.onDestroy()
}
if (null != inputReaderThread) {
errReaderThread.onDestroy()
}
shellECTaskInfoCache.remove(taskId)
}
}