in pkg/apis/frameworkcontroller/v1/funcs.go [545:661]
func (f *Framework) NewPod(cm *core.ConfigMap, taskRoleName string, taskIndex int32) *core.Pod {
// Deep copy Task.Pod before modify it
taskPodJson := common.ToJson(f.TaskRoleSpec(taskRoleName).Task.Pod)
taskRoleStatus := f.TaskRoleStatus(taskRoleName)
taskStatus := f.TaskStatus(taskRoleName, taskIndex)
taskIndexStr := fmt.Sprint(taskIndex)
frameworkUIDStr := string(f.UID)
frameworkAttemptIDStr := fmt.Sprint(f.FrameworkAttemptID())
frameworkAttemptInstanceUIDStr := string(*f.FrameworkAttemptInstanceUID())
configMapUIDStr := string(*f.ConfigMapUID())
taskRoleUIDStr := string(taskRoleStatus.InstanceUID)
taskUIDStr := string(taskStatus.InstanceUID)
taskAttemptIDStr := fmt.Sprint(taskStatus.TaskAttemptID())
taskAttemptInstanceUIDReferStr := string(*GetTaskAttemptInstanceUID(
taskStatus.TaskAttemptID(),
common.PtrUIDStr(common.ReferEnvVar(EnvNamePodUID))))
// Replace Placeholders in Task.Pod
podTemplate := core.PodTemplateSpec{}
placeholderReplacer := strings.NewReplacer(
common.ReferPlaceholder(PlaceholderFrameworkNamespace), f.Namespace,
common.ReferPlaceholder(PlaceholderFrameworkName), f.Name,
common.ReferPlaceholder(PlaceholderTaskRoleName), taskRoleName,
common.ReferPlaceholder(PlaceholderTaskIndex), taskIndexStr,
common.ReferPlaceholder(PlaceholderConfigMapName), f.ConfigMapName(),
common.ReferPlaceholder(PlaceholderPodName), taskStatus.PodName())
// Using Json to avoid breaking one Placeholder to multiple lines
common.FromJson(placeholderReplacer.Replace(taskPodJson), &podTemplate)
// Override Task.Pod
pod := &core.Pod{
ObjectMeta: podTemplate.ObjectMeta,
Spec: podTemplate.Spec,
}
pod.Name = taskStatus.PodName()
pod.Namespace = f.Namespace
// Augment Task.Pod
// By setting the owner is also a controller, the Pod functions normally, except for
// its detail cannot be viewed from k8s dashboard, and the dashboard shows error:
// "Unknown reference kind ConfigMap".
// See https://github.com/kubernetes/dashboard/issues/3158
if pod.OwnerReferences == nil {
pod.OwnerReferences = []meta.OwnerReference{}
}
pod.OwnerReferences = append(pod.OwnerReferences, *meta.NewControllerRef(cm, ConfigMapGroupVersionKind))
if pod.Annotations == nil {
pod.Annotations = map[string]string{}
}
pod.Annotations[AnnotationKeyFrameworkNamespace] = f.Namespace
pod.Annotations[AnnotationKeyFrameworkName] = f.Name
pod.Annotations[AnnotationKeyTaskRoleName] = taskRoleName
pod.Annotations[AnnotationKeyTaskIndex] = taskIndexStr
pod.Annotations[AnnotationKeyConfigMapName] = f.ConfigMapName()
pod.Annotations[AnnotationKeyPodName] = pod.Name
pod.Annotations[AnnotationKeyFrameworkUID] = frameworkUIDStr
pod.Annotations[AnnotationKeyFrameworkAttemptID] = frameworkAttemptIDStr
pod.Annotations[AnnotationKeyFrameworkAttemptInstanceUID] = frameworkAttemptInstanceUIDStr
pod.Annotations[AnnotationKeyConfigMapUID] = configMapUIDStr
pod.Annotations[AnnotationKeyTaskRoleUID] = taskRoleUIDStr
pod.Annotations[AnnotationKeyTaskUID] = taskUIDStr
pod.Annotations[AnnotationKeyTaskAttemptID] = taskAttemptIDStr
if pod.Labels == nil {
pod.Labels = map[string]string{}
}
pod.Labels[LabelKeyFrameworkName] = f.Name
pod.Labels[LabelKeyTaskRoleName] = taskRoleName
pod.Labels[LabelKeyTaskIndex] = taskIndexStr
predefinedEnvs := []core.EnvVar{
{Name: EnvNameFrameworkNamespace, Value: f.Namespace},
{Name: EnvNameFrameworkName, Value: f.Name},
{Name: EnvNameTaskRoleName, Value: taskRoleName},
{Name: EnvNameTaskIndex, Value: taskIndexStr},
{Name: EnvNameConfigMapName, Value: f.ConfigMapName()},
{Name: EnvNamePodName, Value: pod.Name},
{Name: EnvNameFrameworkUID, Value: frameworkUIDStr},
{Name: EnvNameFrameworkAttemptID, Value: frameworkAttemptIDStr},
{Name: EnvNameFrameworkAttemptInstanceUID, Value: frameworkAttemptInstanceUIDStr},
{Name: EnvNameConfigMapUID, Value: configMapUIDStr},
{Name: EnvNameTaskRoleUID, Value: taskRoleUIDStr},
{Name: EnvNameTaskUID, Value: taskUIDStr},
{Name: EnvNameTaskAttemptID, Value: taskAttemptIDStr},
{Name: EnvNamePodUID, ValueFrom: ObjectUIDEnvVarSource},
{Name: EnvNameTaskAttemptInstanceUID, Value: taskAttemptInstanceUIDReferStr},
}
// Prepend predefinedEnvs so that they can be referred by the environment variable
// specified in the spec.
// Change the default TerminationMessagePolicy to TerminationMessageFallbackToLogsOnError
// in case the cluster-level logging has not been setup for the cluster.
// See https://kubernetes.io/docs/concepts/cluster-administration/logging
// It is safe to do so, since it will only fall back to the tail log if the container
// is failed and the termination message file specified by the terminationMessagePath
// is not found or empty.
for i := range pod.Spec.Containers {
pod.Spec.Containers[i].Env = append(append([]core.EnvVar{},
predefinedEnvs...), pod.Spec.Containers[i].Env...)
if len(pod.Spec.Containers[i].TerminationMessagePolicy) == 0 {
pod.Spec.Containers[i].TerminationMessagePolicy = core.TerminationMessageFallbackToLogsOnError
}
}
for i := range pod.Spec.InitContainers {
pod.Spec.InitContainers[i].Env = append(append([]core.EnvVar{},
predefinedEnvs...), pod.Spec.InitContainers[i].Env...)
if len(pod.Spec.InitContainers[i].TerminationMessagePolicy) == 0 {
pod.Spec.InitContainers[i].TerminationMessagePolicy = core.TerminationMessageFallbackToLogsOnError
}
}
return pod
}