override def executeLine()

in linkis-engineconn-plugins/shell/src/main/scala/org/apache/linkis/manager/engineplugin/shell/executor/ShellEngineConnConcurrentExecutor.scala [92:245]


  override def executeLine(
      engineExecutionContext: EngineExecutionContext,
      code: String
  ): ExecuteResponse = {

    if (null != engineExecutionContext) {
      this.engineExecutionContext = engineExecutionContext
      logger.info("Shell executor reset new engineExecutionContext!")
    }

    if (engineExecutionContext.getJobId.isEmpty) {
      return ErrorExecuteResponse("taskID is null", null)
    }

    val taskId = engineExecutionContext.getJobId.get
    var bufferedReader: BufferedReader = null
    var errorsReader: BufferedReader = null

    val completed = new AtomicBoolean(false)
    var errReaderThread: ReaderThread = null
    var inputReaderThread: ReaderThread = null

    try {
      engineExecutionContext.appendStdout(s"$getId >> ${code.trim}")

      val argsArr =
        if (
            engineExecutionContext.getTotalParagraph == 1 &&
            engineExecutionContext.getProperties != null &&
            engineExecutionContext.getProperties.containsKey(
              ShellEngineConnPluginConst.RUNTIME_ARGS_KEY
            )
        ) {
          Utils.tryCatch {
            val argsList = engineExecutionContext.getProperties
              .get(ShellEngineConnPluginConst.RUNTIME_ARGS_KEY)
              .asInstanceOf[util.ArrayList[String]]
            logger.info(
              "Will execute shell task with user-specified arguments: \'" + argsList
                .toArray(new Array[String](argsList.size()))
                .mkString("\' \'") + "\'"
            )
            argsList.toArray(new Array[String](argsList.size()))
          } { t =>
            logger.warn(
              "Cannot read user-input shell arguments. Will execute shell task without them.",
              t
            )
            null
          }
        } else {
          null
        }

      val workingDirectory =
        if (
            engineExecutionContext.getTotalParagraph == 1 &&
            engineExecutionContext.getProperties != null &&
            engineExecutionContext.getProperties.containsKey(
              ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY
            )
        ) {
          Utils.tryCatch {
            val wdStr = engineExecutionContext.getProperties
              .get(ShellEngineConnPluginConst.SHELL_RUNTIME_WORKING_DIRECTORY)
              .asInstanceOf[String]
            if (isExecutePathExist(wdStr)) {
              logger.info(
                "Will execute shell task under user-specified working-directory: \'" + wdStr
              )
              wdStr
            } else {
              logger.warn(
                "User-specified working-directory: \'" + wdStr + "\' does not exist or user does not have access permission. " +
                  "Will execute shell task under default working-directory. Please contact the administrator!"
              )
              null
            }
          } { t =>
            logger.warn(
              "Cannot read user-input working-directory. Will execute shell task under default working-directory.",
              t
            )
            null
          }
        } else {
          null
        }

      val generatedCode = if (argsArr == null || argsArr.isEmpty) {
        generateRunCode(code)
      } else {
        generateRunCodeWithArgs(code, argsArr)
      }

      val processBuilder: ProcessBuilder = new ProcessBuilder(generatedCode: _*)
      if (StringUtils.isNotBlank(workingDirectory)) {
        processBuilder.directory(new File(workingDirectory))
      }
      val env = processBuilder.environment()
      val jobTags = JobUtils.getJobSourceTagsFromObjectMap(engineExecutionContext.getProperties)
      val jobId = JobUtils.getJobIdFromMap(engineExecutionContext.getProperties)
      if (StringUtils.isNotBlank(jobId)) {
        logger.info(s"set env job id ${jobId}.")
        env.put(ComputationExecutorConf.JOB_ID_TO_ENV_KEY, jobId)
      }
      if (StringUtils.isAsciiPrintable(jobTags)) {
        env.put(ECConstants.HIVE_OPTS, s" --hiveconf mapreduce.job.tags=$jobTags")
        env.put(ECConstants.SPARK_SUBMIT_OPTS, s" -Dspark.yarn.tags=$jobTags")
      }

      processBuilder.redirectErrorStream(false)
      val extractor = new YarnAppIdExtractor
      val process = processBuilder.start()
      bufferedReader = new BufferedReader(new InputStreamReader(process.getInputStream))
      errorsReader = new BufferedReader(new InputStreamReader(process.getErrorStream))
      // add task id and task Info cache
      shellECTaskInfoCache.put(taskId, ShellECTaskInfo(taskId, process, extractor))

      val counter: CountDownLatch = new CountDownLatch(2)
      inputReaderThread =
        new ReaderThread(engineExecutionContext, bufferedReader, extractor, true, counter)
      errReaderThread =
        new ReaderThread(engineExecutionContext, errorsReader, extractor, false, counter)

      logAsyncService.execute(inputReaderThread)
      logAsyncService.execute(errReaderThread)

      val exitCode = process.waitFor()
      counter.await()

      completed.set(true)

      if (exitCode != 0) {
        ErrorExecuteResponse(
          s"run shell failed with error:\n ${errReaderThread.getOutString()}",
          ShellCodeErrorException()
        )
      } else SuccessExecuteResponse()

    } catch {
      case e: Exception =>
        logger.error("Execute shell code failed, reason:", e)
        ErrorExecuteResponse("run shell failed", e)
    } finally {
      if (null != errorsReader) {
        inputReaderThread.onDestroy()
      }
      if (null != inputReaderThread) {
        errReaderThread.onDestroy()
      }
      shellECTaskInfoCache.remove(taskId)
    }
  }