def pushLogToFrontend()

in linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala [382:498]


  def pushLogToFrontend(job: Job, log: String): Unit = {
    import LogReader._
    if (StringUtils.isBlank(log)) return
    var message: Message = null
    val logs: Array[String] = new Array[String](4)
    val logArr: Array[String] = log.split("\n\n").filter(StringUtils.isNotBlank)
    val info = new StringBuilder
    val warn = new StringBuilder
    val error = new StringBuilder
    val all = new StringBuilder
    val length = logArr.length
    logArr.foreach(singleLog => {
      if (StringUtils.isNotEmpty(singleLog)) {
        singleLog match {
          case ERROR_HEADER1() | ERROR_HEADER2() =>
            concatLog(length, singleLog, error, all)
          case WARN_HEADER1() | WARN_HEADER2() =>
            val arr =
              EntranceConfiguration.LOG_WARN_EXCLUDE.getValue.split(",").map(word => word.trim)
            var flag = false
            for (keyword <- arr) {
              flag = singleLog.contains(keyword) || flag
            }
            if (!flag) {
              val message = singleLog.split("\n")(0)
              concatLog(length, message, warn, all)
            }
          case INFO_HEADER1() | INFO_HEADER2() =>
            val hiveLogSpecial: String = EntranceConfiguration.HIVE_SPECIAL_LOG_INCLUDE.getValue
            val sparkLogSpecial: String = EntranceConfiguration.SPARK_SPECIAL_LOG_INCLUDE.getValue
            val hiveCreateTableLog: String = EntranceConfiguration.HIVE_CREATE_TABLE_LOG.getValue
            if (singleLog.contains(hiveLogSpecial) && singleLog.contains(hiveCreateTableLog)) {
              val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
              val printInfo = EntranceConfiguration.HIVE_PRINT_INFO_LOG.getValue
              val start = singleLog.indexOf(threadName)
              val end = singleLog.indexOf(printInfo) + printInfo.length
              if (start > 0 && end > 0) {
                val realLog =
                  singleLog.substring(0, start) + singleLog.substring(end, singleLog.length)
                concatLog(length, realLog, info, all)
              }
            }
            if (
                singleLog.contains(hiveLogSpecial) && singleLog.contains("map") && singleLog
                  .contains("reduce")
            ) {
              val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
              val stageName = EntranceConfiguration.HIVE_STAGE_NAME.getHotValue()
              val start = singleLog.indexOf(threadName)
              val end = singleLog.indexOf(stageName)
              if (start > 0 && end > 0) {
                val realLog =
                  singleLog.substring(0, start) + singleLog.substring(end, singleLog.length)
                concatLog(length, realLog, info, all)
              }
            } else if (singleLog.contains(sparkLogSpecial)) {
              val className = EntranceConfiguration.SPARK_PROGRESS_NAME.getValue
              val endFlag = EntranceConfiguration.END_FLAG.getValue
              val start = singleLog.indexOf(className)
              val end = singleLog.indexOf(endFlag) + endFlag.length
              if (start > 0 && end > 0) {
                val realLog =
                  singleLog.substring(0, start) + singleLog.substring(end, singleLog.length)
                concatLog(length, realLog, info, all)
              }
            } else {
              val arr =
                EntranceConfiguration.LOG_EXCLUDE.getValue.split(",").map(word => word.trim)
              var flag = false
              for (keyword <- arr) {
                flag = singleLog.contains(keyword) || flag
              }
              if (!flag) concatLog(length, singleLog, info, all)
            }
          case _ =>
            val arr = EntranceConfiguration.LOG_EXCLUDE.getValue.split(",").map(word => word.trim)
            var flag = false
            for (keyword <- arr) {
              flag = singleLog.contains(keyword) || flag
            }
            if (!flag) concatLog(length, singleLog, info, all)
        }
      }
    })
    if (
        StringUtils.isBlank(info.toString()) &&
        StringUtils.isBlank(warn.toString()) &&
        StringUtils.isBlank(error.toString()) &&
        StringUtils.isBlank(all.toString())
    ) {
      return
    }
    val logList: util.List[String] = new util.ArrayList[String]()
    logList.add(error.toString())
    logList.add(warn.toString())
    logList.add(info.toString())
    logList.add(all.toString())
    message = Message.ok("Return log information(返回日志信息)")
    val jobRequest = job.asInstanceOf[EntranceJob].getJobRequest
    val engineType = LabelUtil.getEngineType(jobRequest.getLabels)
    val creator = LabelUtil.getUserCreator(jobRequest.getLabels)._2
    val executeApplicationName = engineType
    val execID: String = ZuulEntranceUtils.generateExecID(
      job.getId,
      executeApplicationName,
      Sender.getThisInstance,
      creator
    )
    message.setMethod(restfulURI + "entrance/" + execID + "/log")
    val taskID = jobRequest.getId
    message
      .data("execID", execID)
      .data("log", logList)
      .data("websocketTag", websocketTagJobID.get(job.getId))
      .data("taskID", taskID)
    sendMsg(job, message)
  }