in flink-kubernetes-standalone/src/main/java/org/apache/flink/kubernetes/operator/kubeclient/factory/StandaloneKubernetesTaskManagerFactory.java [44:95]
public static Deployment buildKubernetesTaskManagerDeployment(
FlinkPod podTemplate,
StandaloneKubernetesTaskManagerParameters kubernetesTaskManagerParameters) {
FlinkPod flinkPod = Preconditions.checkNotNull(podTemplate).copy();
final KubernetesStepDecorator[] stepDecorators =
new KubernetesStepDecorator[] {
new InitStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
new EnvSecretsDecorator(kubernetesTaskManagerParameters),
new MountSecretsDecorator(kubernetesTaskManagerParameters),
new CmdStandaloneTaskManagerDecorator(kubernetesTaskManagerParameters),
new HadoopConfMountDecorator(kubernetesTaskManagerParameters),
new KerberosMountDecorator(kubernetesTaskManagerParameters),
new FlinkConfMountDecorator(kubernetesTaskManagerParameters)
};
for (KubernetesStepDecorator stepDecorator : stepDecorators) {
flinkPod = stepDecorator.decorateFlinkPod(flinkPod);
}
final Pod resolvedPod =
new PodBuilder(flinkPod.getPodWithoutMainContainer())
.editOrNewSpec()
.addToContainers(flinkPod.getMainContainer())
.endSpec()
.build();
return new DeploymentBuilder()
.withApiVersion(Constants.APPS_API_VERSION)
.editOrNewMetadata()
.withName(
StandaloneKubernetesUtils.getTaskManagerDeploymentName(
kubernetesTaskManagerParameters.getClusterId()))
.withAnnotations(kubernetesTaskManagerParameters.getAnnotations())
.withLabels(kubernetesTaskManagerParameters.getLabels())
.withOwnerReferences(
kubernetesTaskManagerParameters.getOwnerReference().stream()
.map(e -> KubernetesOwnerReference.fromMap(e).getInternalResource())
.collect(Collectors.toList()))
.endMetadata()
.editOrNewSpec()
.withReplicas(kubernetesTaskManagerParameters.getReplicas())
.editOrNewTemplate()
.withMetadata(resolvedPod.getMetadata())
.withSpec(resolvedPod.getSpec())
.endTemplate()
.editOrNewSelector()
.addToMatchLabels(kubernetesTaskManagerParameters.getSelectors())
.endSelector()
.endSpec()
.build();
}