in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/config/FlinkConfigBuilder.java [199:250]
protected FlinkConfigBuilder applyPodTemplate() throws IOException {
PodTemplateSpec commonPodTemplate = spec.getPodTemplate();
boolean mergeByName =
effectiveConfig.get(KubernetesOperatorConfigOptions.POD_TEMPLATE_MERGE_BY_NAME);
PodTemplateSpec jmPodTemplate;
if (spec.getJobManager() != null) {
jmPodTemplate =
mergePodTemplates(
commonPodTemplate, spec.getJobManager().getPodTemplate(), mergeByName);
jmPodTemplate =
applyResourceToPodTemplate(jmPodTemplate, spec.getJobManager().getResource());
} else {
jmPodTemplate = ReconciliationUtils.clone(commonPodTemplate);
}
if (effectiveConfig.get(
KubernetesOperatorConfigOptions.OPERATOR_JM_STARTUP_PROBE_ENABLED)) {
if (jmPodTemplate == null) {
jmPodTemplate = new PodTemplateSpec();
}
FlinkUtils.addStartupProbe(jmPodTemplate);
}
String jmTemplateFile = null;
if (jmPodTemplate != null) {
jmTemplateFile = createTempFile(jmPodTemplate);
effectiveConfig.set(KubernetesConfigOptions.JOB_MANAGER_POD_TEMPLATE, jmTemplateFile);
}
PodTemplateSpec tmPodTemplate;
if (spec.getTaskManager() != null) {
tmPodTemplate =
mergePodTemplates(
commonPodTemplate, spec.getTaskManager().getPodTemplate(), mergeByName);
tmPodTemplate =
applyResourceToPodTemplate(tmPodTemplate, spec.getTaskManager().getResource());
} else {
tmPodTemplate = ReconciliationUtils.clone(commonPodTemplate);
}
if (tmPodTemplate != null) {
effectiveConfig.set(
KubernetesConfigOptions.TASK_MANAGER_POD_TEMPLATE,
tmPodTemplate.equals(jmPodTemplate)
? jmTemplateFile
: createTempFile(tmPodTemplate));
}
return this;
}