in linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/conversions/TaskConversions.scala [220:349]
def jobHistory2TaskVO(job: JobHistory, subjobs: util.List[SubJobDetail]): QueryTaskVO = {
if (null == job) return null
val taskVO = new QueryTaskVO
taskVO.setTaskID(job.getId)
taskVO.setInstance(job.getInstances)
taskVO.setExecId(job.getJobReqId)
taskVO.setUmUser(job.getSubmitUser)
taskVO.setExecuteUser(job.getExecuteUser)
taskVO.setProgress(job.getProgress)
taskVO.setLogPath(job.getLogPath)
taskVO.setStatus(job.getStatus)
taskVO.setResultLocation(job.getResultLocation)
if (null != job.getCreatedTime) taskVO.setCreatedTime(new Date(job.getCreatedTime.getTime))
if (null != job.getUpdatedTime) taskVO.setUpdatedTime(new Date(job.getUpdatedTime.getTime))
val labelList = getLabelListFromJson(job.getLabels)
var engineType = job.getEngineType
var codeType = ""
var creator = ""
if (null != labelList && labelList.size() > 0) {
if (null == engineType) {
engineType = LabelUtil.getEngineType(labelList)
}
codeType = LabelUtil.getCodeType(labelList)
val userCreator = Option(LabelUtil.getUserCreator(labelList)).orNull
if (null != userCreator) {
creator = userCreator._2
}
}
taskVO.setEngineType(engineType)
taskVO.setExecuteApplicationName(job.getEngineType)
taskVO.setRequestApplicationName(creator)
taskVO.setRunType(codeType)
taskVO.setParamsJson(job.getParams)
taskVO.setCreatedTime(job.getCreatedTime)
taskVO.setUpdatedTime(job.getUpdatedTime)
taskVO.setErrCode(job.getErrorCode)
taskVO.setErrDesc(job.getErrorDesc)
val labelStringList = new util.ArrayList[String]()
labelList.foreach(label => labelStringList.add(label.getLabelKey + ":" + label.getStringValue))
taskVO.setLabels(labelStringList)
val metrics =
BDPJettyServerHelper.gson.fromJson((job.getMetrics), classOf[util.Map[String, Object]])
var completeTime: Date = null
if (
null != metrics && metrics.containsKey(TaskConstant.JOB_COMPLETE_TIME) && metrics
.get(TaskConstant.JOB_COMPLETE_TIME) != null
) {
completeTime = dealString2Date(metrics.get(TaskConstant.JOB_COMPLETE_TIME).toString)
}
var createTime: Date = null
if (
null != metrics && metrics.containsKey(TaskConstant.JOB_SUBMIT_TIME) && metrics
.get(TaskConstant.JOB_SUBMIT_TIME) != null
) {
createTime = dealString2Date(metrics.get(TaskConstant.JOB_SUBMIT_TIME).toString)
}
if (
null != metrics && metrics.containsKey(TaskConstant.JOB_IS_REUSE) && metrics
.get(TaskConstant.JOB_IS_REUSE) != null
) {
taskVO.setIsReuse(BooleanUtils.toBoolean(metrics.get(TaskConstant.JOB_IS_REUSE).toString))
}
var requestStartTime: Date = null
var requestEndTime: Date = null
if (
null != metrics && metrics.containsKey(TaskConstant.JOB_SUBMIT_TIME) && metrics
.get(TaskConstant.JOB_SUBMIT_TIME) != null
) {
requestStartTime = dealString2Date(metrics.get(TaskConstant.JOB_SUBMIT_TIME).toString)
taskVO.setRequestStartTime(requestStartTime)
}
if (
null != metrics && metrics.containsKey(TaskConstant.JOB_SCHEDULE_TIME) && metrics
.get(TaskConstant.JOB_SCHEDULE_TIME) != null
) {
requestEndTime = dealString2Date(metrics.get(TaskConstant.JOB_SCHEDULE_TIME).toString)
taskVO.setRequestEndTime(requestEndTime)
}
if (null != requestStartTime && null != requestEndTime) {
taskVO.setRequestSpendTime(requestEndTime.getTime - requestStartTime.getTime)
}
if (null != createTime) {
if (isJobFinished(job.getStatus)) {
if (null != completeTime) {
taskVO.setCostTime(completeTime.getTime - createTime.getTime)
} else if (null != job.getUpdatedTime) {
taskVO.setCostTime(job.getUpdatedTime.getTime - createTime.getTime)
} else {
taskVO.setCostTime(System.currentTimeMillis() - createTime.getTime)
}
} else {
taskVO.setCostTime(System.currentTimeMillis() - createTime.getTime)
}
}
if (null != metrics && metrics.containsKey(TaskConstant.ENGINE_INSTANCE)) {
taskVO.setEngineInstance(metrics.get(TaskConstant.ENGINE_INSTANCE).toString)
} else if (TaskStatus.Failed.toString.equals(job.getStatus)) {
taskVO.setCanRetry(true)
}
val entranceName = JobhistoryConfiguration.ENTRANCE_SPRING_NAME.getValue
val instances =
job.getInstances.split(JobhistoryConfiguration.ENTRANCE_INSTANCE_DELEMITER.getValue)
taskVO.setStrongerExecId(
ZuulEntranceUtils.generateExecID(job.getJobReqId, entranceName, instances)
)
taskVO.setSourceJson(job.getSource)
if (StringUtils.isNotBlank(job.getExecutionCode)) {
taskVO.setExecutionCode(job.getExecutionCode)
}
// Do not attach subjobs for performance
// taskVO.setSubJobs(subjobs)
taskVO.setSourceJson(job.getSource)
if (StringUtils.isNotBlank(job.getSource)) {
Utils.tryCatch {
val source =
BDPJettyServerHelper.gson.fromJson(job.getSource, classOf[util.Map[String, String]])
taskVO.setSourceTailor(source.map(_._2).foldLeft("")(_ + _ + "-").stripSuffix("-"))
} { case _ =>
logger.warn("sourceJson deserialization failed, this task may be the old data.")
}
}
taskVO.setObserveInfo(job.getObserveInfo)
taskVO.setMetrics(job.getMetrics)
taskVO
}