override def doWatch()

in streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala [94:161]


  override def doWatch(): Unit = {
    this.synchronized {
      // get all legal tracking ids
      val trackIds = Try(watchController.getAllWatchingIds())
        .filter(_.nonEmpty)
        .getOrElse(return
        )

      // 1) k8s application mode
      val appFuture: Set[Future[Option[JobStatusCV]]] =
        trackIds.filter(_.executeMode == FlinkK8sDeployMode.APPLICATION).map {
          id =>
            val future = Future(touchApplicationJob(id))
            future.onComplete(_.getOrElse(None) match {
              case Some(jobState) =>
                updateState(id.copy(jobId = jobState.jobId), jobState)
              case _ =>
            })
            future
        }

      // 2) k8s session mode
      val sessionIds =
        trackIds.filter(_.executeMode == FlinkK8sDeployMode.SESSION)
      val sessionCluster =
        sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
      val sessionFuture = sessionCluster.map {
        trackId =>
          val future = Future(touchSessionAllJob(trackId))
          future.onComplete(_.toOption match {
            case Some(map) =>
              map.find(_._1.jobId == trackId.jobId) match {
                case Some(job) =>
                  updateState(job._1.copy(appId = trackId.appId), job._2)
                case _ =>
                  touchSessionJob(trackId) match {
                    case Some(state) =>
                      if (FlinkJobState.isEndState(state.jobState)) {
                        // can't find that job in the k8s cluster.
                        watchController.unWatching(trackId)
                      }
                      eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state))
                    case _ =>
                  }
              }
            case _ =>
          })
          future
      }

      // blocking until all future are completed or timeout is reached
      Try(Await.result(Future.sequence(appFuture), conf.requestTimeoutSec seconds)).failed.map {
        _ =>
          logWarn(
            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes native application mode timeout," +
              s" limitSeconds=${conf.requestTimeoutSec}," +
              s" trackIds=${trackIds.mkString(",")}")
      }

      Try(Await.result(Future.sequence(sessionFuture), conf.requestTimeoutSec seconds)).failed.map {
        _ =>
          logWarn(
            s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes native session mode timeout," +
              s" limitSeconds=${conf.requestTimeoutSec}," +
              s" trackIds=${trackIds.mkString(",")}")
      }
    }
  }