in streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala [145:195]
override def untrack(key: TrackKey): UIO[Unit] = {
def unTrackCluster(ns: String, name: String): UIO[Unit] = for {
_ <- deployCrObserver.unWatch(ns, name)
_ <- restSvcEndpointObserver.unWatch(ns, name)
_ <- clusterObserver.unWatch(ns, name)
} yield ()
def unTrackSessionJob(ns: String, name: String) = {
sessionJobCRObserver.unWatch(ns, name)
}
def unTrackPureCluster(ns: String, name: String) = unTrackCluster(ns, name).whenZIO {
trackedKeys.toSet
.map(set =>
// When a flink cluster is referenced by another resource, tracking of that cluster is maintained.
set.find {
case k: ApplicationJobKey if k.namespace == ns && k.name == name => true
case k: SessionJobKey if k.namespace == ns && k.clusterName == name => true
case k: UnmanagedSessionJobKey if k.clusterNamespace == ns && k.clusterId == name => true
case _ => false
})
.map(_.isEmpty)
}
def unTrackUnmanagedSessionJob(clusterNs: String, clusterName: String) =
unTrackCluster(clusterNs, clusterName).whenZIO {
trackedKeys.toSet
.map(set =>
// When a flink cluster is referenced by another resource, tracking of that cluster is maintained.
set.find {
case k: ApplicationJobKey if k.namespace == clusterNs && k.name == clusterName => true
case k: SessionJobKey if k.namespace == clusterNs && k.clusterName == clusterName => true
case k: ClusterKey if k.namespace == clusterNs && k.name == clusterName => true
case _ => false
})
.map(_.isEmpty)
}.unit
for {
_ <- key match {
case ApplicationJobKey(_, ns, name) => unTrackCluster(ns, name)
case SessionJobKey(_, ns, name, _) => unTrackSessionJob(ns, name)
case ClusterKey(_, ns, name) => unTrackPureCluster(ns, name)
case UnmanagedSessionJobKey(_, clusterNs, clusterName, _) =>
unTrackUnmanagedSessionJob(clusterNs, clusterName)
}
_ <- trackedKeys.remove(key)
_ <- logInfo(s"Stop watching Flink resource: $key")
} yield ()
}