in streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala [480:520]
def getJobStateFromArchiveFile(jobId: String): String = Try {
require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
FsJobArchivist.getArchivedJsons(archivePath) match {
case r if r.isEmpty => FAILED_STATE
case r =>
r.foreach {
a =>
if (a.getPath == s"/jobs/$jobId/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
val log = (ok \ "root-exception").extractOpt[String].orNull
if (log != null) {
val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
val file = new File(path)
Files.asCharSink(file, Charsets.UTF_8).write(log)
}
case _ =>
}
} else if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {
case Success(ok) =>
ok \ "jobs" match {
case JNothing | JNull =>
case JArray(arr) =>
arr.foreach(
x => {
val jid = (x \ "jid").extractOpt[String].orNull
if (jid == jobId) {
return (x \ "state").extractOpt[String].orNull
}
})
case _ =>
}
case Failure(_) =>
}
}
}
FAILED_STATE
}
}.getOrElse(FAILED_STATE)