private def correctPodSpec()

in streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala [147:230]


  private def correctPodSpec(oriPod: Option[Pod], jarHttpUrls: Array[String]): UIO[Option[Pod]] = ZIO.succeed {
    if (jarHttpUrls.isEmpty) oriPod
    else {
      val pod = oriPod.getOrElse(new Pod())

      // handle metadata
      val metadata = Option(pod.getMetadata).getOrElse(new ObjectMeta())
      metadata.setName("pod-template")
      pod.setMetadata(metadata)

      val spec = Option(pod.getSpec).getOrElse(new PodSpec())

      // handle initContainers
      val initContainers: util.List[Container] = Option(spec.getInitContainers).getOrElse(new util.ArrayList())

      val libLoaderInitContainer = new ContainerBuilder()
        .withName("userlib-loader")
        .withImage("busybox:1.35.0")
        .withCommand(
          "sh",
          "-c",
          jarHttpUrls.map(url => s"wget $url -O /opt/flink/lib/${pathLastSegment(url)}").mkString(" && "))
        .withVolumeMounts(
          new VolumeMountBuilder()
            .withName("flink-usrlib")
            .withMountPath("/opt/flink/lib")
            .build
        )
        .build

      initContainers.add(libLoaderInitContainer)

      spec.setInitContainers(initContainers)

      // handle containers
      val flinkMainContainerVolMounts: util.List[VolumeMount] =
        jarHttpUrls
          .map(url => pathLastSegment(url))
          .map(jarName =>
            new VolumeMountBuilder()
              .withName("flink-usrlib")
              .withMountPath(s"/opt/flink/lib/$jarName")
              .withSubPath(jarName)
              .build)
          .toList

      val containers: util.List[Container] = Option(spec.getContainers).getOrElse(new util.ArrayList())

      containers.zipWithIndex
        .find { case (e, _) => e.getName == "flink-main-container" }
        .map { case (e, idx) =>
          val volMounts = Option(e.getVolumeMounts)
            .map { mounts =>
              mounts.addAll(flinkMainContainerVolMounts)
              mounts
            }
            .getOrElse(flinkMainContainerVolMounts)
          e.setVolumeMounts(volMounts)
          containers.set(idx, e)
        }
        .getOrElse(
          containers.add(
            new ContainerBuilder()
              .withName("flink-main-container")
              .withVolumeMounts(flinkMainContainerVolMounts)
              .build)
        )

      spec.setContainers(containers)

      // handle volumes
      val volumes: util.List[Volume] = Option(spec.getVolumes).getOrElse(new util.ArrayList())
      volumes.add(
        new VolumeBuilder()
          .withName("flink-usrlib")
          .withEmptyDir(new EmptyDirVolumeSource())
          .build
      )
      spec.setVolumes(volumes)

      pod.setSpec(spec)
      Some(pod)
    }
  }