def getJobStateFromArchiveFile()

in streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala [501:548]


  def getJobStateFromArchiveFile(trackId: TrackId): String = Try {
    require(trackId.jobId != null, "[StreamPark] getJobStateFromArchiveFile: JobId cannot be null.")
    val archiveDir =
      trackId.properties.getProperty(JobManagerOptions.ARCHIVE_DIR.key)
    if (archiveDir == null) {
      FAILED_STATE
    } else {
      val archivePath = new Path(archiveDir, trackId.jobId)
      FsJobArchivist.getArchivedJsons(archivePath) match {
        case r if r.isEmpty => FAILED_STATE
        case r =>
          r.foreach {
            a =>
              if (a.getPath == s"/jobs/${trackId.jobId}/exceptions") {
                Try(parse(a.getJson)) match {
                  case Success(ok) =>
                    val log = (ok \ "root-exception").extractOpt[String].orNull
                    if (log != null) {
                      val path =
                        KubernetesDeploymentHelper.getJobErrorLog(trackId.jobId)
                      val file = new File(path)
                      Files.asCharSink(file, Charsets.UTF_8).write(log)
                      logInfo(" error path: " + path)
                    }
                  case _ =>
                }
              } else if (a.getPath == "/jobs/overview") {
                Try(parse(a.getJson)) match {
                  case Success(ok) =>
                    ok \ "jobs" match {
                      case JNothing | JNull =>
                      case JArray(arr) =>
                        arr.foreach(x => {
                          val jid = (x \ "jid").extractOpt[String].orNull
                          if (jid == trackId.jobId) {
                            return (x \ "state").extractOpt[String].orNull
                          }
                        })
                      case _ =>
                    }
                  case Failure(_) =>
                }
              }
          }
          FAILED_STATE
      }
    }
  }.getOrElse(FAILED_STATE)