override def untrack()

in streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/observer/FlinkK8sObserver.scala [145:195]


  override def untrack(key: TrackKey): UIO[Unit] = {

    def unTrackCluster(ns: String, name: String): UIO[Unit] = for {
      _ <- deployCrObserver.unWatch(ns, name)
      _ <- restSvcEndpointObserver.unWatch(ns, name)
      _ <- clusterObserver.unWatch(ns, name)
    } yield ()

    def unTrackSessionJob(ns: String, name: String) = {
      sessionJobCRObserver.unWatch(ns, name)
    }

    def unTrackPureCluster(ns: String, name: String) = unTrackCluster(ns, name).whenZIO {
      trackedKeys.toSet
        .map(set =>
          // When a flink cluster is referenced by another resource, tracking of that cluster is maintained.
          set.find {
            case k: ApplicationJobKey if k.namespace == ns && k.name == name                  => true
            case k: SessionJobKey if k.namespace == ns && k.clusterName == name               => true
            case k: UnmanagedSessionJobKey if k.clusterNamespace == ns && k.clusterId == name => true
            case _                                                                            => false
          })
        .map(_.isEmpty)
    }

    def unTrackUnmanagedSessionJob(clusterNs: String, clusterName: String) =
      unTrackCluster(clusterNs, clusterName).whenZIO {
        trackedKeys.toSet
          .map(set =>
            // When a flink cluster is referenced by another resource, tracking of that cluster is maintained.
            set.find {
              case k: ApplicationJobKey if k.namespace == clusterNs && k.name == clusterName    => true
              case k: SessionJobKey if k.namespace == clusterNs && k.clusterName == clusterName => true
              case k: ClusterKey if k.namespace == clusterNs && k.name == clusterName           => true
              case _                                                                            => false
            })
          .map(_.isEmpty)
      }.unit

    for {
      _ <- key match {
             case ApplicationJobKey(_, ns, name)                       => unTrackCluster(ns, name)
             case SessionJobKey(_, ns, name, _)                        => unTrackSessionJob(ns, name)
             case ClusterKey(_, ns, name)                              => unTrackPureCluster(ns, name)
             case UnmanagedSessionJobKey(_, clusterNs, clusterName, _) =>
               unTrackUnmanagedSessionJob(clusterNs, clusterName)
           }
      _ <- trackedKeys.remove(key)
      _ <- logInfo(s"Stop watching Flink resource: $key")
    } yield ()
  }