in resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala [104:301]
override def configurePod(pod: SparkPod): SparkPod = {
val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}"
val configMapName = KubernetesClientUtils.configMapNameExecutor
val confFilesMap = KubernetesClientUtils
.buildSparkConfDirFilesMap(configMapName, kubernetesConf.sparkConf, Map.empty)
val keyToPaths = KubernetesClientUtils.buildKeyToPathObjects(confFilesMap)
// According to
// https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names,
// hostname must be no longer than `KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
// so take the last 63 characters of the pod name as the hostname.
// This preserves uniqueness since the end of name contains executorId
val hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
// Remove non-word characters from the start of the hostname
.replaceAll("^[^\\w]+", "")
// Replace dangerous characters in the remaining string with a safe alternative.
.replaceAll("[^\\w-]+", "_")
val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
val executorCpuQuantity = new Quantity(executorCoresRequest)
val executorResourceQuantities =
buildExecutorResourcesQuantities(execResources.customResources.values.toSet)
val executorEnv: Seq[EnvVar] = {
val sparkAuthSecret = Option(secMgr.getSecretKey()).map {
case authSecret: String if kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty =>
Seq(SecurityManager.ENV_AUTH_SECRET -> authSecret)
case _ => Nil
}.getOrElse(Nil)
val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
kubernetesConf.executorId)
Utils.splitCommandString(subsOpts)
}
val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
SparkConf.isExecutorStartupConf)
val allOpts = (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
(s"$ENV_JAVA_OPT_PREFIX$index", opt)
}.toMap
val attributes = if (kubernetesConf.get(UI.CUSTOM_EXECUTOR_LOG_URL).isDefined) {
Map(
ENV_EXECUTOR_ATTRIBUTE_APP_ID -> kubernetesConf.appId,
ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> kubernetesConf.executorId)
} else {
Map.empty[String, String]
}
KubernetesUtils.buildEnvVars(
Seq(
ENV_DRIVER_URL -> driverUrl,
ENV_EXECUTOR_CORES -> execResources.cores.get.toString,
ENV_EXECUTOR_MEMORY -> executorMemoryString,
ENV_APPLICATION_ID -> kubernetesConf.appId,
// This is to set the SPARK_CONF_DIR to be /opt/spark/conf
ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
ENV_EXECUTOR_ID -> kubernetesConf.executorId,
ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString)
++ attributes
++ kubernetesConf.environment
++ sparkAuthSecret
++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
++ allOpts) ++
KubernetesUtils.buildEnvVarsWithFieldRef(
Seq(
(ENV_EXECUTOR_POD_IP, "v1", "status.podIP"),
(ENV_EXECUTOR_POD_NAME, "v1", "metadata.name")
))
}
executorEnv.find(_.getName == ENV_EXECUTOR_DIRS).foreach { e =>
e.setValue(e.getValue
.replaceAll(ENV_APPLICATION_ID, kubernetesConf.appId)
.replaceAll(ENV_EXECUTOR_ID, kubernetesConf.executorId))
}
// 0 is invalid as kubernetes containerPort request, we shall leave it unmounted
val requiredPorts = if (blockManagerPort != 0) {
Seq(
(BLOCK_MANAGER_PORT_NAME, blockManagerPort))
.map { case (name, port) =>
new ContainerPortBuilder()
.withName(name)
.withContainerPort(port)
.build()
}
} else Nil
if (!isDefaultProfile) {
if (pod.container != null && pod.container.getResources() != null) {
logDebug("NOT using the default profile and removing template resources")
pod.container.setResources(new ResourceRequirements())
}
}
val executorContainer = new ContainerBuilder(pod.container)
.withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
.withImage(executorContainerImage)
.withImagePullPolicy(kubernetesConf.imagePullPolicy)
.editOrNewResources()
.addToRequests("memory", executorMemoryQuantity)
.addToLimits("memory", executorMemoryQuantity)
.addToRequests("cpu", executorCpuQuantity)
.addToLimits(executorResourceQuantities.asJava)
.endResources()
.addNewEnv()
.withName(ENV_SPARK_USER)
.withValue(Utils.getCurrentUserName())
.endEnv()
.addAllToEnv(executorEnv.asJava)
.addAllToPorts(requiredPorts.asJava)
.addToArgs("executor")
.build()
val executorContainerWithConfVolume = if (disableConfigMap) {
executorContainer
} else {
new ContainerBuilder(executorContainer)
.addNewVolumeMount()
.withName(SPARK_CONF_VOLUME_EXEC)
.withMountPath(SPARK_CONF_DIR_INTERNAL)
.endVolumeMount()
.build()
}
val containerWithLimitCores = if (isDefaultProfile) {
executorLimitCores.map { limitCores =>
val executorCpuLimitQuantity = new Quantity(limitCores)
new ContainerBuilder(executorContainerWithConfVolume)
.editResources()
.addToLimits("cpu", executorCpuLimitQuantity)
.endResources()
.build()
}.getOrElse(executorContainerWithConfVolume)
} else {
executorContainerWithConfVolume
}
val containerWithLifecycle =
if (!kubernetesConf.workerDecommissioning) {
logInfo("Decommissioning not enabled, skipping shutdown script")
containerWithLimitCores
} else {
logInfo("Adding decommission script to lifecycle")
new ContainerBuilder(containerWithLimitCores).withNewLifecycle()
.withNewPreStop()
.withNewExec()
.addToCommand(kubernetesConf.get(DECOMMISSION_SCRIPT))
.endExec()
.endPreStop()
.endLifecycle()
.build()
}
val ownerReference = kubernetesConf.driverPod.map { pod =>
new OwnerReferenceBuilder()
.withController(true)
.withApiVersion(pod.getApiVersion)
.withKind(pod.getKind)
.withName(pod.getMetadata.getName)
.withUid(pod.getMetadata.getUid)
.build()
}
val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
case "statefulset" => "Always"
case _ => "Never"
}
val executorPodBuilder = new PodBuilder(pod.pod)
.editOrNewMetadata()
.withName(name)
.addToLabels(kubernetesConf.labels.asJava)
.addToAnnotations(kubernetesConf.annotations.asJava)
.addToOwnerReferences(ownerReference.toSeq: _*)
.endMetadata()
.editOrNewSpec()
.withHostname(hostname)
.withRestartPolicy(policy)
.addToNodeSelector(kubernetesConf.nodeSelector.asJava)
.addToNodeSelector(kubernetesConf.executorNodeSelector.asJava)
.addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
val executorPod = if (disableConfigMap) {
executorPodBuilder.endSpec().build()
} else {
executorPodBuilder
.addNewVolume()
.withName(SPARK_CONF_VOLUME_EXEC)
.withNewConfigMap()
.withItems(keyToPaths.asJava)
.withName(configMapName)
.endConfigMap()
.endVolume()
.endSpec()
.build()
}
kubernetesConf.schedulerName
.foreach(executorPod.getSpec.setSchedulerName)
SparkPod(executorPod, containerWithLifecycle)
}