def toExecuteTask()

in linkis-computation-governance/linkis-engineconn/linkis-computation-engineconn/src/main/scala/org/apache/linkis/engineconn/computation/executor/execute/ComputationExecutor.scala [207:327]


  def toExecuteTask(
      engineConnTask: EngineConnTask,
      internalExecute: Boolean = false
  ): ExecuteResponse = {
    runningTasks.increase()
    this.internalExecute = internalExecute
    Utils.tryFinally {
      transformTaskStatus(engineConnTask, ExecutionNodeStatus.Running)
      val engineExecutionContext = createEngineExecutionContext(engineConnTask)

      val engineCreationContext = EngineConnObject.getEngineCreationContext

      var hookedCode = engineConnTask.getCode
      Utils.tryCatch {
        ComputationExecutorHook.getComputationExecutorHooks.foreach(hook => {
          hookedCode =
            hook.beforeExecutorExecute(engineExecutionContext, engineCreationContext, hookedCode)
        })
      } { e =>
        e match {
          case hookExecuteException: HookExecuteException =>
            failedTasks.increase()
            logger.error("failed to do with hook", e)
            return ErrorExecuteResponse("hook execute failed task will be failed", e)
          case _ => logger.info("failed to do with hook", e)
        }
      }
      if (hookedCode.length > 100) {
        logger.info(s"hooked after code: ${hookedCode.substring(0, 100)} ....")
      } else {
        logger.info(s"hooked after code: $hookedCode ")
      }

      // task params log
      // spark engine: at org.apache.linkis.engineplugin.spark.executor.SparkEngineConnExecutor.executeLine log special conf
      Utils.tryAndWarn {
        val engineType = LabelUtil.getEngineType(engineCreationContext.getLabels())
        EngineType.mapStringToEngineType(engineType) match {
          case EngineType.HIVE | EngineType.TRINO => printTaskParamsLog(engineExecutionContext)
          case _ =>
        }
      }

      val localPath = EngineConnConf.getLogDir
      engineExecutionContext.appendStdout(
        LogUtils.generateInfo(
          s"EngineConn local log path: ${DataWorkCloudApplication.getServiceInstance.toString} $localPath"
        )
      )

      var response: ExecuteResponse = null
      val incomplete = new StringBuilder
      val codes =
        Utils.tryCatch(getCodeParser.map(_.parse(hookedCode)).getOrElse(Array(hookedCode))) { e =>
          logger.info("Your code will be submitted in overall mode.", e)
          Array(hookedCode)
        }
      engineExecutionContext.setTotalParagraph(codes.length)
      codes.indices.foreach({ index =>
        if (ExecutionNodeStatus.Cancelled == engineConnTask.getStatus) {
          return ErrorExecuteResponse("Job is killed by user!", null)
        }
        val code = codes(index)
        engineExecutionContext.setCurrentParagraph(index + 1)

        response = Utils.tryCatch(if (incomplete.nonEmpty) {
          executeCompletely(engineExecutionContext, code, incomplete.toString())
        } else executeLine(engineExecutionContext, code)) { t =>
          ErrorExecuteResponse(ExceptionUtils.getRootCauseMessage(t), t)
        }

        incomplete ++= code
        response match {
          case e: ErrorExecuteResponse =>
            failedTasks.increase()
            logger.error("execute code failed!", e.t)
            return response
          case SuccessExecuteResponse() =>
            engineExecutionContext.appendStdout("\n")
            incomplete.setLength(0)
          case e: OutputExecuteResponse =>
            incomplete.setLength(0)
            val output =
              if (StringUtils.isNotEmpty(e.getOutput) && e.getOutput.length > outputPrintLimit) {
                e.getOutput.substring(0, outputPrintLimit)
              } else e.getOutput
            engineExecutionContext.appendStdout(output)
            if (StringUtils.isNotBlank(e.getOutput)) engineExecutionContext.sendResultSet(e)
          case _: IncompleteExecuteResponse =>
            incomplete ++= incompleteSplitter
        }
      })
      Utils.tryCatch(engineExecutionContext.close()) { t =>
        response = ErrorExecuteResponse("send resultSet to entrance failed!", t)
        failedTasks.increase()
      }

      if (null == response && codes.isEmpty) {
        logger.warn("This code is empty, the task will be directly marked as successful")
        response = SuccessExecuteResponse()
      }
      response = response match {
        case _: OutputExecuteResponse =>
          succeedTasks.increase()
          SuccessExecuteResponse()
        case s: SuccessExecuteResponse =>
          succeedTasks.increase()
          s
        case incompleteExecuteResponse: IncompleteExecuteResponse =>
          ErrorExecuteResponse(
            s"The task cannot be an incomplete response ${incompleteExecuteResponse.message}",
            null
          )
        case _ => response
      }
      response
    } {
      runningTasks.decrease()
      this.internalExecute = false
    }
  }