in resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala [149:384]
def isDeleted(executorId: String): Boolean = deletedExecutorIds.contains(executorId.toLong)
protected def onNewSnapshots(
applicationId: String,
schedulerBackend: KubernetesClusterSchedulerBackend,
snapshots: Seq[ExecutorPodsSnapshot]): Unit = {
logDebug(s"Received ${snapshots.size} snapshots")
val k8sKnownExecIds = snapshots.flatMap(_.executorPods.keys).distinct
newlyCreatedExecutors --= k8sKnownExecIds
schedulerKnownNewlyCreatedExecs --= k8sKnownExecIds
// Although we are going to delete some executors due to timeout in this function,
// it takes undefined time before the actual deletion. Hence, we should collect all PVCs
// in use at the beginning. False positive is okay in this context in order to be safe.
val k8sKnownPVCNames = snapshots.flatMap(_.executorPods.values.map(_.pod)).flatMap { pod =>
pod.getSpec.getVolumes.asScala
.flatMap { v => Option(v.getPersistentVolumeClaim).map(_.getClaimName) }
}.distinct
// transfer the scheduler backend known executor requests from the newlyCreatedExecutors
// to the schedulerKnownNewlyCreatedExecs
val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet
schedulerKnownNewlyCreatedExecs ++=
newlyCreatedExecutors.filter { case (k, _) => schedulerKnownExecs.contains(k) }
.map { case (k, v) => (k, v._1) }
newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet
// For all executors we've created against the API but have not seen in a snapshot
// yet - check the current time. If the current time has exceeded some threshold,
// assume that the pod was either never created (the API server never properly
// handled the creation request), or the API server created the pod but we missed
// both the creation and deletion events. In either case, delete the missing pod
// if possible, and mark such a pod to be rescheduled below.
val currentTime = clock.getTimeMillis()
val timedOut = newlyCreatedExecutors.flatMap { case (execId, (_, timeCreated)) =>
if (currentTime - timeCreated > podCreationTimeout) {
Some(execId)
} else {
logDebug(s"Executor with id $execId was not found in the Kubernetes cluster since it" +
s" was created ${currentTime - timeCreated} milliseconds ago.")
None
}
}
if (timedOut.nonEmpty) {
logWarning(log"Executors with ids ${MDC(LogKeys.EXECUTOR_IDS, timedOut.mkString(","))}} " +
log"were not detected in the Kubernetes cluster after " +
log"${MDC(LogKeys.TIMEOUT, podCreationTimeout)} ms despite the fact that a previous " +
log"allocation attempt tried to create them. The executors may have been deleted but the " +
log"application missed the deletion event.")
newlyCreatedExecutors --= timedOut
if (shouldDeleteExecutors) {
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
.delete()
}
}
}
if (snapshots.nonEmpty) {
lastSnapshot = snapshots.last
}
// Make a local, non-volatile copy of the reference since it's used multiple times. This
// is the only method that modifies the list, so this is safe.
var _deletedExecutorIds = deletedExecutorIds
if (snapshots.nonEmpty) {
val existingExecs = lastSnapshot.executorPods.keySet
_deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
}
val notDeletedPods = lastSnapshot.executorPods
.filter { case (k, _) => !_deletedExecutorIds.contains(k) }
// Map the pods into per ResourceProfile id so we can check per ResourceProfile,
// add a fast path if not using other ResourceProfiles.
val rpIdToExecsAndPodState =
mutable.HashMap[Int, mutable.HashMap[Long, ExecutorPodState]]()
if (totalExpectedExecutorsPerResourceProfileId.size <= 1) {
rpIdToExecsAndPodState(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID) =
mutable.HashMap.empty ++= notDeletedPods
} else {
notDeletedPods.foreach { case (execId, execPodState) =>
val rpId = execPodState.pod.getMetadata.getLabels.get(SPARK_RESOURCE_PROFILE_ID_LABEL).toInt
val execPods = rpIdToExecsAndPodState.getOrElseUpdate(rpId,
mutable.HashMap[Long, ExecutorPodState]())
execPods(execId) = execPodState
}
}
// sum of all the pending pods unknown by the scheduler (total for all the resources)
var totalPendingCount = 0
// total not running pods (including scheduler known & unknown, pending & newly requested ones)
var totalNotRunningPodCount = 0
val podsToAllocateWithRpId = totalExpectedExecutorsPerResourceProfileId
.asScala
.toSeq
.sortBy(_._1)
.flatMap { case (rpId, targetNum) =>
val podsForRpId = rpIdToExecsAndPodState.getOrElse(rpId, mutable.HashMap.empty)
val currentRunningCount = podsForRpId.values.count {
case PodRunning(_) => true
case _ => false
}
val (schedulerKnownPendingExecsForRpId, currentPendingExecutorsForRpId) = podsForRpId.filter {
case (_, PodPending(_)) => true
case _ => false
}.partition { case (k, _) =>
schedulerKnownExecs.contains(k)
}
// This variable is used later to print some debug logs. It's updated when cleaning up
// excess pod requests, since currentPendingExecutorsForRpId is immutable.
var pendingCountForRpId = currentPendingExecutorsForRpId.size
val newlyCreatedExecutorsForRpId =
newlyCreatedExecutors.filter { case (_, (waitingRpId, _)) =>
rpId == waitingRpId
}
val schedulerKnownNewlyCreatedExecsForRpId =
schedulerKnownNewlyCreatedExecs.filter { case (_, waitingRpId) =>
rpId == waitingRpId
}
if (podsForRpId.nonEmpty) {
logDebug(s"ResourceProfile Id: $rpId (" +
s"pod allocation status: $currentRunningCount running, " +
s"${currentPendingExecutorsForRpId.size} unknown pending, " +
s"${schedulerKnownPendingExecsForRpId.size} scheduler backend known pending, " +
s"${newlyCreatedExecutorsForRpId.size} unknown newly created, " +
s"${schedulerKnownNewlyCreatedExecsForRpId.size} scheduler backend known newly created)")
}
// It's possible that we have outstanding pods that are outdated when dynamic allocation
// decides to downscale the application. So check if we can release any pending pods early
// instead of waiting for them to time out. Drop them first from the unacknowledged list,
// then from the pending. However, in order to prevent too frequent fluctuation, newly
// requested pods are protected during executorIdleTimeout period.
//
// TODO: with dynamic allocation off, handle edge cases if we end up with more running
// executors than expected.
var notRunningPodCountForRpId =
currentPendingExecutorsForRpId.size + schedulerKnownPendingExecsForRpId.size +
newlyCreatedExecutorsForRpId.size + schedulerKnownNewlyCreatedExecsForRpId.size
val podCountForRpId = currentRunningCount + notRunningPodCountForRpId
if (podCountForRpId > targetNum) {
val excess = podCountForRpId - targetNum
val newlyCreatedToDelete = newlyCreatedExecutorsForRpId
.filter { case (_, (_, createTime)) =>
currentTime - createTime > executorIdleTimeout
}.keys.take(excess).toList
val pendingToDelete = currentPendingExecutorsForRpId
.filter(x => isExecutorIdleTimedOut(x._2, currentTime))
.take(excess - newlyCreatedToDelete.size)
.map { case (id, _) => id }
val toDelete = newlyCreatedToDelete ++ pendingToDelete
if (toDelete.nonEmpty) {
logInfo(log"Deleting ${MDC(LogKeys.COUNT, toDelete.size)} excess pod requests " +
log"(${MDC(LogKeys.RESOURCE_PROFILE_IDS, toDelete.mkString(","))}).")
_deletedExecutorIds = _deletedExecutorIds ++ toDelete
Utils.tryLogNonFatalError {
kubernetesClient
.pods()
.inNamespace(namespace)
.withField("status.phase", "Pending")
.withLabel(SPARK_APP_ID_LABEL, applicationId)
.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
.withLabelIn(SPARK_EXECUTOR_ID_LABEL, toDelete.sorted.map(_.toString): _*)
.delete()
newlyCreatedExecutors --= newlyCreatedToDelete
pendingCountForRpId -= pendingToDelete.size
notRunningPodCountForRpId -= toDelete.size
}
}
}
totalPendingCount += pendingCountForRpId
totalNotRunningPodCount += notRunningPodCountForRpId
// The code below just prints debug messages, which are only useful when there's a change
// in the snapshot state. Since the messages are a little spammy, avoid them when we know
// there are no useful updates.
if (log.isDebugEnabled && snapshots.nonEmpty) {
if (currentRunningCount >= targetNum && !dynamicAllocationEnabled) {
logDebug(s"Current number of running executors for ResourceProfile Id $rpId is " +
"equal to the number of requested executors. Not scaling up further.")
} else {
if (newlyCreatedExecutorsForRpId.nonEmpty) {
logDebug(s"Still waiting for ${newlyCreatedExecutorsForRpId.size} executors for " +
s"ResourceProfile Id $rpId before requesting more.")
}
}
}
if (newlyCreatedExecutorsForRpId.isEmpty && podCountForRpId < targetNum) {
Some(rpId, podCountForRpId, targetNum)
} else {
// for this resource profile we do not request more PODs
None
}
}
// Try to request new executors only when there exist remaining slots within the maximum
// number of pending pods and new snapshot arrives in case of waiting for releasing of the
// existing PVCs
val remainingSlotFromPendingPods = maxPendingPods - totalNotRunningPodCount
if (remainingSlotFromPendingPods > 0 && podsToAllocateWithRpId.size > 0 &&
!(snapshots.isEmpty && podAllocOnPVC && maxPVCs <= PVC_COUNTER.get())) {
ExecutorPodsAllocator.splitSlots(podsToAllocateWithRpId, remainingSlotFromPendingPods)
.foreach { case ((rpId, podCountForRpId, targetNum), sharedSlotFromPendingPods) =>
val numMissingPodsForRpId = targetNum - podCountForRpId
val numExecutorsToAllocate =
math.min(math.min(numMissingPodsForRpId, podAllocationSize), sharedSlotFromPendingPods)
logInfo(log"Going to request ${MDC(LogKeys.COUNT, numExecutorsToAllocate)} executors from" +
log" Kubernetes for ResourceProfile Id: ${MDC(LogKeys.RESOURCE_PROFILE_ID, rpId)}, " +
log"target: ${MDC(LogKeys.NUM_POD_TARGET, targetNum)}, " +
log"known: ${MDC(LogKeys.NUM_POD, podCountForRpId)}, sharedSlotFromPendingPods: " +
log"${MDC(LogKeys.NUM_POD_SHARED_SLOT, sharedSlotFromPendingPods)}.")
requestNewExecutors(numExecutorsToAllocate, applicationId, rpId, k8sKnownPVCNames)
}
}
deletedExecutorIds = _deletedExecutorIds
// Update the flag that helps the setTotalExpectedExecutors() callback avoid triggering this
// update method when not needed. PODs known by the scheduler backend are not counted here as
// they considered running PODs and they should not block upscaling.
numOutstandingPods.set(totalPendingCount + newlyCreatedExecutors.size)
}