in linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceWebSocketService.scala [382:498]
def pushLogToFrontend(job: Job, log: String): Unit = {
import LogReader._
if (StringUtils.isBlank(log)) return
var message: Message = null
val logs: Array[String] = new Array[String](4)
val logArr: Array[String] = log.split("\n\n").filter(StringUtils.isNotBlank)
val info = new StringBuilder
val warn = new StringBuilder
val error = new StringBuilder
val all = new StringBuilder
val length = logArr.length
logArr.foreach(singleLog => {
if (StringUtils.isNotEmpty(singleLog)) {
singleLog match {
case ERROR_HEADER1() | ERROR_HEADER2() =>
concatLog(length, singleLog, error, all)
case WARN_HEADER1() | WARN_HEADER2() =>
val arr =
EntranceConfiguration.LOG_WARN_EXCLUDE.getValue.split(",").map(word => word.trim)
var flag = false
for (keyword <- arr) {
flag = singleLog.contains(keyword) || flag
}
if (!flag) {
val message = singleLog.split("\n")(0)
concatLog(length, message, warn, all)
}
case INFO_HEADER1() | INFO_HEADER2() =>
val hiveLogSpecial: String = EntranceConfiguration.HIVE_SPECIAL_LOG_INCLUDE.getValue
val sparkLogSpecial: String = EntranceConfiguration.SPARK_SPECIAL_LOG_INCLUDE.getValue
val hiveCreateTableLog: String = EntranceConfiguration.HIVE_CREATE_TABLE_LOG.getValue
if (singleLog.contains(hiveLogSpecial) && singleLog.contains(hiveCreateTableLog)) {
val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
val printInfo = EntranceConfiguration.HIVE_PRINT_INFO_LOG.getValue
val start = singleLog.indexOf(threadName)
val end = singleLog.indexOf(printInfo) + printInfo.length
if (start > 0 && end > 0) {
val realLog =
singleLog.substring(0, start) + singleLog.substring(end, singleLog.length)
concatLog(length, realLog, info, all)
}
}
if (
singleLog.contains(hiveLogSpecial) && singleLog.contains("map") && singleLog
.contains("reduce")
) {
val threadName = EntranceConfiguration.HIVE_THREAD_NAME.getHotValue()
val stageName = EntranceConfiguration.HIVE_STAGE_NAME.getHotValue()
val start = singleLog.indexOf(threadName)
val end = singleLog.indexOf(stageName)
if (start > 0 && end > 0) {
val realLog =
singleLog.substring(0, start) + singleLog.substring(end, singleLog.length)
concatLog(length, realLog, info, all)
}
} else if (singleLog.contains(sparkLogSpecial)) {
val className = EntranceConfiguration.SPARK_PROGRESS_NAME.getValue
val endFlag = EntranceConfiguration.END_FLAG.getValue
val start = singleLog.indexOf(className)
val end = singleLog.indexOf(endFlag) + endFlag.length
if (start > 0 && end > 0) {
val realLog =
singleLog.substring(0, start) + singleLog.substring(end, singleLog.length)
concatLog(length, realLog, info, all)
}
} else {
val arr =
EntranceConfiguration.LOG_EXCLUDE.getValue.split(",").map(word => word.trim)
var flag = false
for (keyword <- arr) {
flag = singleLog.contains(keyword) || flag
}
if (!flag) concatLog(length, singleLog, info, all)
}
case _ =>
val arr = EntranceConfiguration.LOG_EXCLUDE.getValue.split(",").map(word => word.trim)
var flag = false
for (keyword <- arr) {
flag = singleLog.contains(keyword) || flag
}
if (!flag) concatLog(length, singleLog, info, all)
}
}
})
if (
StringUtils.isBlank(info.toString()) &&
StringUtils.isBlank(warn.toString()) &&
StringUtils.isBlank(error.toString()) &&
StringUtils.isBlank(all.toString())
) {
return
}
val logList: util.List[String] = new util.ArrayList[String]()
logList.add(error.toString())
logList.add(warn.toString())
logList.add(info.toString())
logList.add(all.toString())
message = Message.ok("Return log information(返回日志信息)")
val jobRequest = job.asInstanceOf[EntranceJob].getJobRequest
val engineType = LabelUtil.getEngineType(jobRequest.getLabels)
val creator = LabelUtil.getUserCreator(jobRequest.getLabels)._2
val executeApplicationName = engineType
val execID: String = ZuulEntranceUtils.generateExecID(
job.getId,
executeApplicationName,
Sender.getThisInstance,
creator
)
message.setMethod(restfulURI + "entrance/" + execID + "/log")
val taskID = jobRequest.getId
message
.data("execID", execID)
.data("log", logList)
.data("websocketTag", websocketTagJobID.get(job.getId))
.data("taskID", taskID)
sendMsg(job, message)
}