in streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala [147:230]
private def correctPodSpec(oriPod: Option[Pod], jarHttpUrls: Array[String]): UIO[Option[Pod]] = ZIO.succeed {
if (jarHttpUrls.isEmpty) oriPod
else {
val pod = oriPod.getOrElse(new Pod())
// handle metadata
val metadata = Option(pod.getMetadata).getOrElse(new ObjectMeta())
metadata.setName("pod-template")
pod.setMetadata(metadata)
val spec = Option(pod.getSpec).getOrElse(new PodSpec())
// handle initContainers
val initContainers: util.List[Container] = Option(spec.getInitContainers).getOrElse(new util.ArrayList())
val libLoaderInitContainer = new ContainerBuilder()
.withName("userlib-loader")
.withImage("busybox:1.35.0")
.withCommand(
"sh",
"-c",
jarHttpUrls.map(url => s"wget $url -O /opt/flink/lib/${pathLastSegment(url)}").mkString(" && "))
.withVolumeMounts(
new VolumeMountBuilder()
.withName("flink-usrlib")
.withMountPath("/opt/flink/lib")
.build
)
.build
initContainers.add(libLoaderInitContainer)
spec.setInitContainers(initContainers)
// handle containers
val flinkMainContainerVolMounts: util.List[VolumeMount] =
jarHttpUrls
.map(url => pathLastSegment(url))
.map(jarName =>
new VolumeMountBuilder()
.withName("flink-usrlib")
.withMountPath(s"/opt/flink/lib/$jarName")
.withSubPath(jarName)
.build)
.toList
val containers: util.List[Container] = Option(spec.getContainers).getOrElse(new util.ArrayList())
containers.zipWithIndex
.find { case (e, _) => e.getName == "flink-main-container" }
.map { case (e, idx) =>
val volMounts = Option(e.getVolumeMounts)
.map { mounts =>
mounts.addAll(flinkMainContainerVolMounts)
mounts
}
.getOrElse(flinkMainContainerVolMounts)
e.setVolumeMounts(volMounts)
containers.set(idx, e)
}
.getOrElse(
containers.add(
new ContainerBuilder()
.withName("flink-main-container")
.withVolumeMounts(flinkMainContainerVolMounts)
.build)
)
spec.setContainers(containers)
// handle volumes
val volumes: util.List[Volume] = Option(spec.getVolumes).getOrElse(new util.ArrayList())
volumes.add(
new VolumeBuilder()
.withName("flink-usrlib")
.withEmptyDir(new EmptyDirVolumeSource())
.build
)
spec.setVolumes(volumes)
pod.setSpec(spec)
Some(pod)
}
}