func requiresThermosConvert()

in pkg/jobmgr/util/handler/thermos.go [116:225]


func requiresThermosConvert(podSpec *pod.PodSpec) (bool, error) {
	if podSpec == nil {
		return false, nil
	}

	// Requires thermos data conversion, if MesosSpec inside PodSpec is
	// custom executor and executor data is empty.
	mesosSpec := podSpec.GetMesosSpec()
	if mesosSpec.GetExecutorSpec().GetType() != apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_CUSTOM ||
		len(mesosSpec.GetExecutorSpec().GetData()) > 0 {
		return false, nil
	}

	containers := podSpec.GetContainers()
	if len(containers) == 0 {
		return false, nil
	}

	// Container image must be defined for main container
	mainContainer := containers[0]
	if len(mainContainer.GetImage()) == 0 {
		return false, yarpcerrors.InvalidArgumentErrorf("container image must be defined")
	}

	// Check if name of the containers are defined, and does not have duplicates
	containerNames := map[string]struct{}{}
	for _, c := range append(
		podSpec.GetInitContainers(),
		podSpec.GetContainers()...,
	) {
		n := c.GetName()
		if len(n) == 0 {
			return false, yarpcerrors.InvalidArgumentErrorf("container does not have name specified")
		}
		if _, ok := containerNames[n]; ok {
			return false, yarpcerrors.InvalidArgumentErrorf("duplicate name found in container names")
		}
		containerNames[n] = struct{}{}
	}

	// Verify volumes and build volumes map
	volumes := map[string]*volume.VolumeSpec{}
	for _, v := range podSpec.GetVolumes() {
		if len(v.GetName()) == 0 {
			return false, yarpcerrors.InvalidArgumentErrorf("volume does not have name specified")
		}

		if _, ok := volumes[v.GetName()]; ok {
			return false, yarpcerrors.InvalidArgumentErrorf("duplicate volume name found in pod")
		}

		switch v.GetType() {
		case volume.VolumeSpec_VOLUME_TYPE_EMPTY_DIR:
			return false, yarpcerrors.InvalidArgumentErrorf("empty dir volume type not supported for volume: %s", v.GetName())
		case volume.VolumeSpec_VOLUME_TYPE_HOST_PATH:
			if len(v.GetHostPath().GetPath()) == 0 {
				return false, yarpcerrors.InvalidArgumentErrorf("path is empty for host_path volume")
			}
		case volume.VolumeSpec_VOLUME_TYPE_INVALID:
			return false, yarpcerrors.InvalidArgumentErrorf("invalid volume type for volume: %s", v.GetName())
		}

		volumes[v.GetName()] = v
	}

	// Verify all containers for volume mounts and environment variables
	envs := map[string]struct{}{}
	mounts := map[string]struct{}{}

	for _, c := range append(
		podSpec.GetInitContainers(),
		podSpec.GetContainers()...,
	) {
		// Verify volume mounts
		for _, m := range c.GetVolumeMounts() {
			if len(m.GetName()) == 0 {
				return false, yarpcerrors.InvalidArgumentErrorf("volume mount does not specify volume name")
			}

			if len(m.GetMountPath()) == 0 {
				return false, yarpcerrors.InvalidArgumentErrorf("volume mount does not specify mount path")
			}

			if _, ok := volumes[m.GetName()]; !ok {
				return false, yarpcerrors.InvalidArgumentErrorf("volume not defined: %s", m.GetName())
			}

			if _, ok := mounts[m.GetName()]; ok {
				return false, yarpcerrors.InvalidArgumentErrorf("duplicate volume mount not allowed")
			}

			mounts[m.GetName()] = struct{}{}
		}

		// Verify environment variables
		for _, e := range c.GetEnvironment() {
			if len(e.GetName()) == 0 {
				return false, yarpcerrors.InvalidArgumentErrorf("environment variable name not defined")
			}

			if _, ok := envs[e.GetName()]; ok {
				return false, yarpcerrors.InvalidArgumentErrorf("duplicate environment variable not allowed")
			}

			envs[e.GetName()] = struct{}{}
		}
	}

	return true, nil
}