in streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala [501:548]
def getJobStateFromArchiveFile(trackId: TrackId): String = Try {
require(trackId.jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
val archiveDir =
trackId.properties.getProperty(JobManagerOptions.ARCHIVE_DIR.key)
if (archiveDir == null) {
FAILED_STATE
} else {
val archivePath = new Path(archiveDir, trackId.jobId)
FsJobArchivist.getArchivedJsons(archivePath) match {
case r if r.isEmpty => FAILED_STATE
case r =>
r.foreach {
a =>
if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
Try(parse(a.getJson)) match {
case Success(ok) =>
val log = (ok \ "root-exception").extractOpt[String].orNull
if (log != null) {
val path =
KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
val file = new File(path)
Files.asCharSink(file, Charsets.UTF_8).write(log)
logInfo(" error path: " + path)
}
case _ =>
}
} else if (a.getPath == "/jobs/overview") {
Try(parse(a.getJson)) match {
case Success(ok) =>
ok \ "jobs" match {
case JNothing | JNull =>
case JArray(arr) =>
arr.foreach(x => {
val jid = (x \ "jid").extractOpt[String].orNull
if (jid == trackId.jobId) {
return (x \ "state").extractOpt[String].orNull
}
})
case _ =>
}
case Failure(_) =>
}
}
}
FAILED_STATE
}
}
}.getOrElse(FAILED_STATE)