in streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkJobStatusWatcher.scala [94:161]
override def doWatch(): Unit = {
this.synchronized {
// get all legal tracking ids
val trackIds = Try(watchController.getAllWatchingIds())
.filter(_.nonEmpty)
.getOrElse(return
)
// 1) k8s application mode
val appFuture: Set[Future[Option[JobStatusCV]]] =
trackIds.filter(_.executeMode == FlinkK8sDeployMode.APPLICATION).map {
id =>
val future = Future(touchApplicationJob(id))
future.onComplete(_.getOrElse(None) match {
case Some(jobState) =>
updateState(id.copy(jobId = jobState.jobId), jobState)
case _ =>
})
future
}
// 2) k8s session mode
val sessionIds =
trackIds.filter(_.executeMode == FlinkK8sDeployMode.SESSION)
val sessionCluster =
sessionIds.groupBy(_.toClusterKey.toString).flatMap(_._2).toSet
val sessionFuture = sessionCluster.map {
trackId =>
val future = Future(touchSessionAllJob(trackId))
future.onComplete(_.toOption match {
case Some(map) =>
map.find(_._1.jobId == trackId.jobId) match {
case Some(job) =>
updateState(job._1.copy(appId = trackId.appId), job._2)
case _ =>
touchSessionJob(trackId) match {
case Some(state) =>
if (FlinkJobState.isEndState(state.jobState)) {
// can't find that job in the k8s cluster.
watchController.unWatching(trackId)
}
eventBus.postSync(FlinkJobStatusChangeEvent(trackId, state))
case _ =>
}
}
case _ =>
})
future
}
// blocking until all future are completed or timeout is reached
Try(Await.result(Future.sequence(appFuture), conf.requestTimeoutSec seconds)).failed.map {
_ =>
logWarn(
s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes native application mode timeout," +
s" limitSeconds=${conf.requestTimeoutSec}," +
s" trackIds=${trackIds.mkString(",")}")
}
Try(Await.result(Future.sequence(sessionFuture), conf.requestTimeoutSec seconds)).failed.map {
_ =>
logWarn(
s"[FlinkJobStatusWatcher] tracking flink job status on kubernetes native session mode timeout," +
s" limitSeconds=${conf.requestTimeoutSec}," +
s" trackIds=${trackIds.mkString(",")}")
}
}
}