in pkg/jobmgr/util/handler/thermos.go [116:225]
func requiresThermosConvert(podSpec *pod.PodSpec) (bool, error) {
if podSpec == nil {
return false, nil
}
// Requires thermos data conversion, if MesosSpec inside PodSpec is
// custom executor and executor data is empty.
mesosSpec := podSpec.GetMesosSpec()
if mesosSpec.GetExecutorSpec().GetType() != apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_CUSTOM ||
len(mesosSpec.GetExecutorSpec().GetData()) > 0 {
return false, nil
}
containers := podSpec.GetContainers()
if len(containers) == 0 {
return false, nil
}
// Container image must be defined for main container
mainContainer := containers[0]
if len(mainContainer.GetImage()) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("container image must be defined")
}
// Check if name of the containers are defined, and does not have duplicates
containerNames := map[string]struct{}{}
for _, c := range append(
podSpec.GetInitContainers(),
podSpec.GetContainers()...,
) {
n := c.GetName()
if len(n) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("container does not have name specified")
}
if _, ok := containerNames[n]; ok {
return false, yarpcerrors.InvalidArgumentErrorf("duplicate name found in container names")
}
containerNames[n] = struct{}{}
}
// Verify volumes and build volumes map
volumes := map[string]*volume.VolumeSpec{}
for _, v := range podSpec.GetVolumes() {
if len(v.GetName()) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("volume does not have name specified")
}
if _, ok := volumes[v.GetName()]; ok {
return false, yarpcerrors.InvalidArgumentErrorf("duplicate volume name found in pod")
}
switch v.GetType() {
case volume.VolumeSpec_VOLUME_TYPE_EMPTY_DIR:
return false, yarpcerrors.InvalidArgumentErrorf("empty dir volume type not supported for volume: %s", v.GetName())
case volume.VolumeSpec_VOLUME_TYPE_HOST_PATH:
if len(v.GetHostPath().GetPath()) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("path is empty for host_path volume")
}
case volume.VolumeSpec_VOLUME_TYPE_INVALID:
return false, yarpcerrors.InvalidArgumentErrorf("invalid volume type for volume: %s", v.GetName())
}
volumes[v.GetName()] = v
}
// Verify all containers for volume mounts and environment variables
envs := map[string]struct{}{}
mounts := map[string]struct{}{}
for _, c := range append(
podSpec.GetInitContainers(),
podSpec.GetContainers()...,
) {
// Verify volume mounts
for _, m := range c.GetVolumeMounts() {
if len(m.GetName()) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("volume mount does not specify volume name")
}
if len(m.GetMountPath()) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("volume mount does not specify mount path")
}
if _, ok := volumes[m.GetName()]; !ok {
return false, yarpcerrors.InvalidArgumentErrorf("volume not defined: %s", m.GetName())
}
if _, ok := mounts[m.GetName()]; ok {
return false, yarpcerrors.InvalidArgumentErrorf("duplicate volume mount not allowed")
}
mounts[m.GetName()] = struct{}{}
}
// Verify environment variables
for _, e := range c.GetEnvironment() {
if len(e.GetName()) == 0 {
return false, yarpcerrors.InvalidArgumentErrorf("environment variable name not defined")
}
if _, ok := envs[e.GetName()]; ok {
return false, yarpcerrors.InvalidArgumentErrorf("duplicate environment variable not allowed")
}
envs[e.GetName()] = struct{}{}
}
}
return true, nil
}