override def configurePod()

in resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala [104:301]


  override def configurePod(pod: SparkPod): SparkPod = {
    val name = s"$executorPodNamePrefix-exec-${kubernetesConf.executorId}"
    val configMapName = KubernetesClientUtils.configMapNameExecutor
    val confFilesMap = KubernetesClientUtils
      .buildSparkConfDirFilesMap(configMapName, kubernetesConf.sparkConf, Map.empty)
    val keyToPaths = KubernetesClientUtils.buildKeyToPathObjects(confFilesMap)
    // According to
    // https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#dns-label-names,
    // hostname must be no longer than `KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH`(63) characters,
    // so take the last 63 characters of the pod name as the hostname.
    // This preserves uniqueness since the end of name contains executorId
    val hostname = name.substring(Math.max(0, name.length - KUBERNETES_DNS_LABEL_NAME_MAX_LENGTH))
      // Remove non-word characters from the start of the hostname
      .replaceAll("^[^\\w]+", "")
      // Replace dangerous characters in the remaining string with a safe alternative.
      .replaceAll("[^\\w-]+", "_")

    val executorMemoryQuantity = new Quantity(s"${execResources.totalMemMiB}Mi")
    val executorCpuQuantity = new Quantity(executorCoresRequest)
    val executorResourceQuantities =
      buildExecutorResourcesQuantities(execResources.customResources.values.toSet)

    val executorEnv: Seq[EnvVar] = {
      val sparkAuthSecret = Option(secMgr.getSecretKey()).map {
        case authSecret: String if kubernetesConf.get(AUTH_SECRET_FILE_EXECUTOR).isEmpty =>
          Seq(SecurityManager.ENV_AUTH_SECRET -> authSecret)
        case _ => Nil
      }.getOrElse(Nil)

      val userOpts = kubernetesConf.get(EXECUTOR_JAVA_OPTIONS).toSeq.flatMap { opts =>
        val subsOpts = Utils.substituteAppNExecIds(opts, kubernetesConf.appId,
          kubernetesConf.executorId)
          Utils.splitCommandString(subsOpts)
      }

      val sparkOpts = Utils.sparkJavaOpts(kubernetesConf.sparkConf,
        SparkConf.isExecutorStartupConf)

      val allOpts = (userOpts ++ sparkOpts).zipWithIndex.map { case (opt, index) =>
        (s"$ENV_JAVA_OPT_PREFIX$index", opt)
      }.toMap

      val attributes = if (kubernetesConf.get(UI.CUSTOM_EXECUTOR_LOG_URL).isDefined) {
        Map(
          ENV_EXECUTOR_ATTRIBUTE_APP_ID -> kubernetesConf.appId,
          ENV_EXECUTOR_ATTRIBUTE_EXECUTOR_ID -> kubernetesConf.executorId)
      } else {
        Map.empty[String, String]
      }

      KubernetesUtils.buildEnvVars(
        Seq(
          ENV_DRIVER_URL -> driverUrl,
          ENV_EXECUTOR_CORES -> execResources.cores.get.toString,
          ENV_EXECUTOR_MEMORY -> executorMemoryString,
          ENV_APPLICATION_ID -> kubernetesConf.appId,
          // This is to set the SPARK_CONF_DIR to be /opt/spark/conf
          ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL,
          ENV_EXECUTOR_ID -> kubernetesConf.executorId,
          ENV_RESOURCE_PROFILE_ID -> resourceProfile.id.toString)
          ++ attributes
          ++ kubernetesConf.environment
          ++ sparkAuthSecret
          ++ Seq(ENV_CLASSPATH -> kubernetesConf.get(EXECUTOR_CLASS_PATH).orNull)
          ++ allOpts) ++
      KubernetesUtils.buildEnvVarsWithFieldRef(
        Seq(
          (ENV_EXECUTOR_POD_IP, "v1", "status.podIP"),
          (ENV_EXECUTOR_POD_NAME, "v1", "metadata.name")
        ))
    }
    executorEnv.find(_.getName == ENV_EXECUTOR_DIRS).foreach { e =>
      e.setValue(e.getValue
        .replaceAll(ENV_APPLICATION_ID, kubernetesConf.appId)
        .replaceAll(ENV_EXECUTOR_ID, kubernetesConf.executorId))
    }

    // 0 is invalid as kubernetes containerPort request, we shall leave it unmounted
    val requiredPorts = if (blockManagerPort != 0) {
      Seq(
        (BLOCK_MANAGER_PORT_NAME, blockManagerPort))
        .map { case (name, port) =>
          new ContainerPortBuilder()
            .withName(name)
            .withContainerPort(port)
            .build()
        }
    } else Nil

    if (!isDefaultProfile) {
      if (pod.container != null && pod.container.getResources() != null) {
        logDebug("NOT using the default profile and removing template resources")
        pod.container.setResources(new ResourceRequirements())
      }
    }

    val executorContainer = new ContainerBuilder(pod.container)
      .withName(Option(pod.container.getName).getOrElse(DEFAULT_EXECUTOR_CONTAINER_NAME))
      .withImage(executorContainerImage)
      .withImagePullPolicy(kubernetesConf.imagePullPolicy)
      .editOrNewResources()
        .addToRequests("memory", executorMemoryQuantity)
        .addToLimits("memory", executorMemoryQuantity)
        .addToRequests("cpu", executorCpuQuantity)
        .addToLimits(executorResourceQuantities.asJava)
        .endResources()
      .addNewEnv()
        .withName(ENV_SPARK_USER)
        .withValue(Utils.getCurrentUserName())
        .endEnv()
      .addAllToEnv(executorEnv.asJava)
      .addAllToPorts(requiredPorts.asJava)
      .addToArgs("executor")
      .build()
    val executorContainerWithConfVolume = if (disableConfigMap) {
      executorContainer
    } else {
      new ContainerBuilder(executorContainer)
        .addNewVolumeMount()
          .withName(SPARK_CONF_VOLUME_EXEC)
          .withMountPath(SPARK_CONF_DIR_INTERNAL)
          .endVolumeMount()
        .build()
    }
    val containerWithLimitCores = if (isDefaultProfile) {
      executorLimitCores.map { limitCores =>
        val executorCpuLimitQuantity = new Quantity(limitCores)
        new ContainerBuilder(executorContainerWithConfVolume)
          .editResources()
          .addToLimits("cpu", executorCpuLimitQuantity)
          .endResources()
          .build()
      }.getOrElse(executorContainerWithConfVolume)
    } else {
      executorContainerWithConfVolume
    }
    val containerWithLifecycle =
      if (!kubernetesConf.workerDecommissioning) {
        logInfo("Decommissioning not enabled, skipping shutdown script")
        containerWithLimitCores
      } else {
        logInfo("Adding decommission script to lifecycle")
        new ContainerBuilder(containerWithLimitCores).withNewLifecycle()
          .withNewPreStop()
            .withNewExec()
              .addToCommand(kubernetesConf.get(DECOMMISSION_SCRIPT))
            .endExec()
          .endPreStop()
          .endLifecycle()
          .build()
      }
    val ownerReference = kubernetesConf.driverPod.map { pod =>
      new OwnerReferenceBuilder()
        .withController(true)
        .withApiVersion(pod.getApiVersion)
        .withKind(pod.getKind)
        .withName(pod.getMetadata.getName)
        .withUid(pod.getMetadata.getUid)
        .build()
    }

    val policy = kubernetesConf.get(KUBERNETES_ALLOCATION_PODS_ALLOCATOR) match {
      case "statefulset" => "Always"
      case _ => "Never"
    }

    val executorPodBuilder = new PodBuilder(pod.pod)
      .editOrNewMetadata()
        .withName(name)
        .addToLabels(kubernetesConf.labels.asJava)
        .addToAnnotations(kubernetesConf.annotations.asJava)
        .addToOwnerReferences(ownerReference.toSeq: _*)
        .endMetadata()
      .editOrNewSpec()
        .withHostname(hostname)
        .withRestartPolicy(policy)
        .addToNodeSelector(kubernetesConf.nodeSelector.asJava)
        .addToNodeSelector(kubernetesConf.executorNodeSelector.asJava)
        .addToImagePullSecrets(kubernetesConf.imagePullSecrets: _*)
    val executorPod = if (disableConfigMap) {
      executorPodBuilder.endSpec().build()
    } else {
      executorPodBuilder
        .addNewVolume()
          .withName(SPARK_CONF_VOLUME_EXEC)
          .withNewConfigMap()
            .withItems(keyToPaths.asJava)
            .withName(configMapName)
            .endConfigMap()
          .endVolume()
        .endSpec()
      .build()
    }
    kubernetesConf.schedulerName
      .foreach(executorPod.getSpec.setSchedulerName)

    SparkPod(executorPod, containerWithLifecycle)
  }