def isDeleted()

in resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala [149:384]


  def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)

  protected def onNewSnapshots(
      applicationId: String,
      schedulerBackend: KubernetesClusterSchedulerBackend,
      snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
    logDebug(s"Received ${snapshots.size} snapshots")
    val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
    newlyCreatedExecutors --= k8sKnownExecIds
    schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds

    // Although we are going to delete some executors due to timeout in this function,
    // it takes undefined time before the actual deletion. Hence, we should collect all PVCs
    // in use at the beginning. False positive is okay in this context in order to be safe.
    val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod =>
      pod.getSpec.getVolumes.asScala
        .flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) }
    }.distinct

    // transfer the scheduler backend known executor requests from the newlyCreatedExecutors
    // to the schedulerKnownNewlyCreatedExecs
    val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet
    schedulerKnownNewlyCreatedExecs ++=
      newlyCreatedExecutors.filter { case (k, _) => schedulerKnownExecs.contains(k) }
        .map { case (k, v) => (k, v._1) }
    newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet

    // For all executors we've created against the API but have not seen in a snapshot
    // yet - check the current time. If the current time has exceeded some threshold,
    // assume that the pod was either never created (the API server never properly
    // handled the creation request), or the API server created the pod but we missed
    // both the creation and deletion events. In either case, delete the missing pod
    // if possible, and mark such a pod to be rescheduled below.
    val currentTime = clock.getTimeMillis()
    val timedOut = newlyCreatedExecutors.flatMap { case (execId, (_, timeCreated)) =>
      if (currentTime - timeCreated > podCreationTimeout) {
        Some(execId)
      } else {
        logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
          s" was created ${currentTime - timeCreated} milliseconds ago.")
        None
      }
    }

    if (timedOut.nonEmpty) {
      logWarning(log"Executors with ids ${MDC(LogKeys.EXECUTOR_IDS, timedOut.mkString(","))}} " +
        log"were not detected in the Kubernetes cluster after " +
        log"${MDC(LogKeys.TIMEOUT, podCreationTimeout)} ms despite the fact that a previous " +
        log"allocation attempt tried to create them. The executors may have been deleted but the " +
        log"application missed the deletion event.")

      newlyCreatedExecutors --= timedOut
      if (shouldDeleteExecutors) {
        Utils.tryLogNonFatalError {
          kubernetesClient
            .pods()
            .inNamespace(namespace)
            .withLabel(SPARK_APP_ID_LABEL, applicationId)
            .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
            .withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
            .delete()
        }
      }
    }

    if (snapshots.nonEmpty) {
      lastSnapshot = snapshots.last
    }

    // Make a local, non-volatile copy of the reference since it's used multiple times. This
    // is the only method that modifies the list, so this is safe.
    var _deletedExecutorIds = deletedExecutorIds
    if (snapshots.nonEmpty) {
      val existingExecs = lastSnapshot.executorPods.keySet
      _deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
    }

    val notDeletedPods = lastSnapshot.executorPods
      .filter { case (k, _) => !_deletedExecutorIds.contains(k) }
    // Map the pods into per ResourceProfile id so we can check per ResourceProfile,
    // add a fast path if not using other ResourceProfiles.
    val rpIdToExecsAndPodState =
      mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
    if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
      rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
        mutable.HashMap.empty ++= notDeletedPods
    } else {
      notDeletedPods.foreach { case (execId, execPodState) =>
        val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
        val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
          mutable.HashMap[Long, ExecutorPodState]())
        execPods(execId) = execPodState
      }
    }

    // sum of all the pending pods unknown by the scheduler (total for all the resources)
    var totalPendingCount = 0
    // total not running pods (including scheduler known & unknown, pending & newly requested ones)
    var totalNotRunningPodCount = 0
    val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
      .asScala
      .toSeq
      .sortBy(_._1)
      .flatMap { case (rpId, targetNum) =>
      val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)

      val currentRunningCount = podsForRpId.values.count {
        case PodRunning(_) => true
        case _ => false
      }

      val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter {
        case (_, PodPending(_)) => true
        case _ => false
      }.partition { case (k, _) =>
        schedulerKnownExecs.contains(k)
      }
      // This variable is used later to print some debug logs. It's updated when cleaning up
      // excess pod requests, since currentPendingExecutorsForRpId is immutable.
      var pendingCountForRpId = currentPendingExecutorsForRpId.size

      val newlyCreatedExecutorsForRpId =
        newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
          rpId == waitingRpId
        }

      val schedulerKnownNewlyCreatedExecsForRpId =
        schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) =>
          rpId == waitingRpId
        }

      if (podsForRpId.nonEmpty) {
        logDebug(s"ResourceProfile Id: $rpId (" +
          s"pod allocation status: $currentRunningCount running, " +
          s"${currentPendingExecutorsForRpId.size} unknown pending, " +
          s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
          s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
          s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)")
      }

      // It's possible that we have outstanding pods that are outdated when dynamic allocation
      // decides to downscale the application. So check if we can release any pending pods early
      // instead of waiting for them to time out. Drop them first from the unacknowledged list,
      // then from the pending. However, in order to prevent too frequent fluctuation, newly
      // requested pods are protected during executorIdleTimeout period.
      //
      // TODO: with dynamic allocation off, handle edge cases if we end up with more running
      // executors than expected.
      var notRunningPodCountForRpId =
        currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size +
        newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size
      val podCountForRpId = currentRunningCount + notRunningPodCountForRpId

      if (podCountForRpId > targetNum) {
        val excess = podCountForRpId - targetNum
        val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
          .filter { case (_, (_, createTime)) =>
            currentTime - createTime > executorIdleTimeout
          }.keys.take(excess).toList
        val pendingToDelete = currentPendingExecutorsForRpId
          .filter(x => isExecutorIdleTimedOut(x._2, currentTime))
          .take(excess - newlyCreatedToDelete.size)
          .map { case (id, _) => id }
        val toDelete = newlyCreatedToDelete ++ pendingToDelete

        if (toDelete.nonEmpty) {
          logInfo(log"Deleting ${MDC(LogKeys.COUNT, toDelete.size)} excess pod requests " +
            log"(${MDC(LogKeys.RESOURCE_PROFILE_IDS, toDelete.mkString(","))}).")
          _deletedExecutorIds = _deletedExecutorIds ++ toDelete

          Utils.tryLogNonFatalError {
            kubernetesClient
              .pods()
              .inNamespace(namespace)
              .withField("status.phase", "Pending")
              .withLabel(SPARK_APP_ID_LABEL, applicationId)
              .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
              .withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
              .delete()
            newlyCreatedExecutors --= newlyCreatedToDelete
            pendingCountForRpId -= pendingToDelete.size
            notRunningPodCountForRpId -= toDelete.size
          }
        }
      }
      totalPendingCount += pendingCountForRpId
      totalNotRunningPodCount += notRunningPodCountForRpId

      // The code below just prints debug messages, which are only useful when there's a change
      // in the snapshot state. Since the messages are a little spammy, avoid them when we know
      // there are no useful updates.
      if (log.isDebugEnabled && snapshots.nonEmpty) {
        if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
          logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
            "equal to the number of requested executors. Not scaling up further.")
        } else {
          if (newlyCreatedExecutorsForRpId.nonEmpty) {
            logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " +
              s"ResourceProfile Id $rpId before requesting more.")
          }
        }
      }
      if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) {
        Some(rpId, podCountForRpId, targetNum)
      } else {
        // for this resource profile we do not request more PODs
        None
      }
    }

    // Try to request new executors only when there exist remaining slots within the maximum
    // number of pending pods and new snapshot arrives in case of waiting for releasing of the
    // existing PVCs
    val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount
    if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0 &&
        !(snapshots.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get())) {
      ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods)
        .foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) =>
        val numMissingPodsForRpId = targetNum - podCountForRpId
        val numExecutorsToAllocate =
          math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods)
        logInfo(log"Going to request ${MDC(LogKeys.COUNT, numExecutorsToAllocate)} executors from" +
          log" Kubernetes for ResourceProfile Id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " +
          log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " +
          log"known: ${MDC(LogKeys.NUM_POD, podCountForRpId)}, sharedSlotFromPendingPods: " +
          log"${MDC(LogKeys.NUM_POD_SHARED_SLOT, sharedSlotFromPendingPods)}.")
        requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames)
      }
    }
    deletedExecutorIds = _deletedExecutorIds

    // Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
    // update method when not needed. PODs known by the scheduler backend are not counted here as
    // they considered running PODs and they should not block upscaling.
    numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
  }