def getJobStateFromArchiveFile()

in streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala [480:520]


  def getJobStateFromArchiveFile(jobId: String): String = Try {
    require(jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
    val archivePath = new Path(Workspace.ARCHIVES_FILE_PATH, jobId)
    FsJobArchivist.getArchivedJsons(archivePath) match {
      case r if r.isEmpty => FAILED_STATE
      case r =>
        r.foreach {
          a =>
            if (a.getPath == s"/jobs/$jobId/exceptions") {
              Try(parse(a.getJson)) match {
                case Success(ok) =>
                  val log = (ok \ "root-exception").extractOpt[String].orNull
                  if (log != null) {
                    val path = KubernetesDeploymentHelper.getJobErrorLog(jobId)
                    val file = new File(path)
                    Files.asCharSink(file, Charsets.UTF_8).write(log)
                  }
                case _ =>
              }
            } else if (a.getPath == "/jobs/overview") {
              Try(parse(a.getJson)) match {
                case Success(ok) =>
                  ok \ "jobs" match {
                    case JNothing | JNull =>
                    case JArray(arr) =>
                      arr.foreach(
                        x => {
                          val jid = (x \ "jid").extractOpt[String].orNull
                          if (jid == jobId) {
                            return (x \ "state").extractOpt[String].orNull
                          }
                        })
                    case _ =>
                  }
                case Failure(_) =>
              }
            }
        }
        FAILED_STATE
    }
  }.getOrElse(FAILED_STATE)