pkg/jobmgr/util/handler/thermos.go (612 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package handler import ( "encoding/json" "math" "sort" "strconv" "strings" mesos "github.com/uber/peloton/.gen/mesos/v1" "github.com/uber/peloton/.gen/peloton/api/v1alpha/job/stateless" "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod/apachemesos" "github.com/uber/peloton/.gen/peloton/api/v1alpha/volume" aurora "github.com/uber/peloton/.gen/thrift/aurora/api" "go.uber.org/yarpc/yarpcerrors" "github.com/uber/peloton/pkg/common/config" "github.com/uber/peloton/pkg/common/taskconfig" "github.com/uber/peloton/pkg/common/thermos" "github.com/uber/peloton/pkg/jobmgr/util/expansion" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" "go.uber.org/thriftrw/ptr" ) const ( _thermosContainerName = "thermos-container" _jobEnvironment = "us1.production" _auroraLabelPrefix = "org.apache.aurora.metadata." MbInBytes = 1024 * 1024 ) // portSpecByName sorts a list of port spec by name type portSpecByName []*pod.PortSpec func (p portSpecByName) Len() int { return len(p) } func (p portSpecByName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p portSpecByName) Less(i, j int) bool { return strings.Compare(p[i].GetName(), p[j].GetName()) < 0 } // ConvertForThermosExecutor takes JobSpec as an input, generates and attaches // thermos executor data if conversion could happen, and returns a mutated // version of JobSpec. func ConvertForThermosExecutor( jobSpec *stateless.JobSpec, thermosConfig config.ThermosExecutorConfig, ) (*stateless.JobSpec, error) { defaultSpec := jobSpec.GetDefaultSpec() convert, err := requiresThermosConvert(defaultSpec) if err != nil { return nil, err } if convert { newSpec, err := convertPodSpec( defaultSpec, defaultSpec, jobSpec, thermosConfig, ) if err != nil { return nil, err } jobSpec.DefaultSpec = newSpec } for instanceID, instanceSpec := range jobSpec.GetInstanceSpec() { mergedSpec := taskconfig.MergePodSpec(defaultSpec, instanceSpec) convert, err := requiresThermosConvert(mergedSpec) if err != nil { return nil, err } if convert { newSpec, err := convertPodSpec( instanceSpec, mergedSpec, jobSpec, thermosConfig, ) if err != nil { return nil, err } jobSpec.InstanceSpec[instanceID] = newSpec } } return jobSpec, nil } // requiresThermosConvert checks if the Peloton PodSpec requires thermos // executor conversion. Throws an error if the PodSpec requires conversion, // but fails validation. func requiresThermosConvert(podSpec *pod.PodSpec) (bool, error) { if podSpec == nil { return false, nil } // Requires thermos data conversion, if MesosSpec inside PodSpec is // custom executor and executor data is empty. mesosSpec := podSpec.GetMesosSpec() if mesosSpec.GetExecutorSpec().GetType() != apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_CUSTOM || len(mesosSpec.GetExecutorSpec().GetData()) > 0 { return false, nil } containers := podSpec.GetContainers() if len(containers) == 0 { return false, nil } // Container image must be defined for main container mainContainer := containers[0] if len(mainContainer.GetImage()) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("container image must be defined") } // Check if name of the containers are defined, and does not have duplicates containerNames := map[string]struct{}{} for _, c := range append( podSpec.GetInitContainers(), podSpec.GetContainers()..., ) { n := c.GetName() if len(n) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("container does not have name specified") } if _, ok := containerNames[n]; ok { return false, yarpcerrors.InvalidArgumentErrorf("duplicate name found in container names") } containerNames[n] = struct{}{} } // Verify volumes and build volumes map volumes := map[string]*volume.VolumeSpec{} for _, v := range podSpec.GetVolumes() { if len(v.GetName()) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("volume does not have name specified") } if _, ok := volumes[v.GetName()]; ok { return false, yarpcerrors.InvalidArgumentErrorf("duplicate volume name found in pod") } switch v.GetType() { case volume.VolumeSpec_VOLUME_TYPE_EMPTY_DIR: return false, yarpcerrors.InvalidArgumentErrorf("empty dir volume type not supported for volume: %s", v.GetName()) case volume.VolumeSpec_VOLUME_TYPE_HOST_PATH: if len(v.GetHostPath().GetPath()) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("path is empty for host_path volume") } case volume.VolumeSpec_VOLUME_TYPE_INVALID: return false, yarpcerrors.InvalidArgumentErrorf("invalid volume type for volume: %s", v.GetName()) } volumes[v.GetName()] = v } // Verify all containers for volume mounts and environment variables envs := map[string]struct{}{} mounts := map[string]struct{}{} for _, c := range append( podSpec.GetInitContainers(), podSpec.GetContainers()..., ) { // Verify volume mounts for _, m := range c.GetVolumeMounts() { if len(m.GetName()) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("volume mount does not specify volume name") } if len(m.GetMountPath()) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("volume mount does not specify mount path") } if _, ok := volumes[m.GetName()]; !ok { return false, yarpcerrors.InvalidArgumentErrorf("volume not defined: %s", m.GetName()) } if _, ok := mounts[m.GetName()]; ok { return false, yarpcerrors.InvalidArgumentErrorf("duplicate volume mount not allowed") } mounts[m.GetName()] = struct{}{} } // Verify environment variables for _, e := range c.GetEnvironment() { if len(e.GetName()) == 0 { return false, yarpcerrors.InvalidArgumentErrorf("environment variable name not defined") } if _, ok := envs[e.GetName()]; ok { return false, yarpcerrors.InvalidArgumentErrorf("duplicate environment variable not allowed") } envs[e.GetName()] = struct{}{} } } return true, nil } // collectResources collects resources (including ports) from all containers // in the PodSpec. func collectResources(podSpec *pod.PodSpec) (*pod.ResourceSpec, []*pod.PortSpec) { // Collect maximum resource and ports allocated (key'ed by port name) // by initial containers. maxInitRes := &pod.ResourceSpec{} initPorts := make(map[string]*pod.PortSpec) for _, initContainer := range podSpec.GetInitContainers() { res := initContainer.GetResource() if res.GetCpuLimit() > maxInitRes.GetCpuLimit() { maxInitRes.CpuLimit = res.CpuLimit } if res.GetMemLimitMb() > maxInitRes.GetMemLimitMb() { maxInitRes.MemLimitMb = res.MemLimitMb } if res.GetDiskLimitMb() > maxInitRes.GetDiskLimitMb() { maxInitRes.DiskLimitMb = res.DiskLimitMb } if res.GetFdLimit() > maxInitRes.GetFdLimit() { maxInitRes.FdLimit = res.FdLimit } if res.GetGpuLimit() > maxInitRes.GetGpuLimit() { maxInitRes.GpuLimit = res.GpuLimit } for _, port := range initContainer.GetPorts() { if _, ok := initPorts[port.GetName()]; !ok { initPorts[port.GetName()] = port } } } // Collect sum of resources and ports allocated (key'ed by port name) // by containers sumRes := &pod.ResourceSpec{} ports := make(map[string]*pod.PortSpec) for _, container := range podSpec.GetContainers() { res := container.GetResource() sumRes.CpuLimit = sumRes.GetCpuLimit() + res.GetCpuLimit() sumRes.MemLimitMb = sumRes.GetMemLimitMb() + res.GetMemLimitMb() sumRes.DiskLimitMb = sumRes.GetDiskLimitMb() + res.GetDiskLimitMb() sumRes.FdLimit = sumRes.GetFdLimit() + res.GetFdLimit() sumRes.GpuLimit = sumRes.GetGpuLimit() + res.GetGpuLimit() for _, port := range container.GetPorts() { if _, ok := ports[port.GetName()]; !ok { ports[port.GetName()] = port } } } // Returned resource would be max of (maxInitRes, sumRes) // Returned ports would be merged list of (initPorts, ports) resultRes := &pod.ResourceSpec{ CpuLimit: math.Max(maxInitRes.GetCpuLimit(), sumRes.GetCpuLimit()), MemLimitMb: math.Max(maxInitRes.GetMemLimitMb(), sumRes.GetMemLimitMb()), DiskLimitMb: math.Max(maxInitRes.GetDiskLimitMb(), sumRes.GetDiskLimitMb()), GpuLimit: math.Max(maxInitRes.GetGpuLimit(), sumRes.GetGpuLimit()), // Using a function here since math.Max only supports float64 FdLimit: func() uint32 { if maxInitRes.GetFdLimit() > sumRes.GetFdLimit() { return maxInitRes.GetFdLimit() } return sumRes.GetFdLimit() }(), } portsMap := make(map[string]*pod.PortSpec) for n, p := range initPorts { if _, ok := portsMap[n]; !ok { portsMap[n] = p } } for n, p := range ports { if _, ok := portsMap[n]; !ok { portsMap[n] = p } } resultPorts := make([]*pod.PortSpec, 0, len(portsMap)) for _, p := range portsMap { resultPorts = append(resultPorts, p) } // Make sure the order of the ports are consistent to avoid // unnecessary job restarts. sort.Stable(portSpecByName(resultPorts)) return resultRes, resultPorts } // createDockerInfo create mesos DockerInfo struct from ContainerSpec func createDockerInfo(podSpec *pod.PodSpec) *mesos.ContainerInfo_DockerInfo { // TODO: Validate parameters are set in ContainerSpec // TODO: Right now we are overriding all the custom docker // parameters passed in, thus we need to figure out some way to support // thinsg like ulimit, cap-add, pids-limit etc. var params []*mesos.Parameter mainContainer := podSpec.GetContainers()[0] // Build volumes map volumes := map[string]*volume.VolumeSpec{} for _, v := range podSpec.GetVolumes() { volumes[v.GetName()] = v } for _, c := range append( podSpec.GetInitContainers(), podSpec.GetContainers()..., ) { // Generate docker environment parameters for _, env := range c.GetEnvironment() { params = append(params, &mesos.Parameter{ Key: ptr.String("env"), Value: ptr.String(env.GetName() + "=" + env.GetValue()), }) } // Generate docker volume parameters for _, mount := range c.GetVolumeMounts() { var param *mesos.Parameter v := volumes[mount.GetName()] switch v.GetType() { case volume.VolumeSpec_VOLUME_TYPE_HOST_PATH: value := v.GetHostPath().GetPath() + ":" + mount.GetMountPath() + ":" if mount.GetReadOnly() { value += "ro" } else { value += "rw" } param = &mesos.Parameter{ Key: ptr.String("volume"), Value: ptr.String(value), } } if param != nil { params = append(params, param) } } } return &mesos.ContainerInfo_DockerInfo{ Image: ptr.String(mainContainer.GetImage()), Parameters: params, } } // convert converts Peloton PodSpec to Aurora TaskConfig thrift structure. func convert( jobSpec *stateless.JobSpec, podSpec *pod.PodSpec, ) (*aurora.TaskConfig, error) { collectedRes, collectedPorts := collectResources(podSpec) // tier tier := convertTier(jobSpec) // resources var resources []*aurora.Resource if collectedRes.GetCpuLimit() > 0 { resources = append(resources, &aurora.Resource{ NumCpus: ptr.Float64(collectedRes.GetCpuLimit()), }) } if collectedRes.GetMemLimitMb() > 0 { resources = append(resources, &aurora.Resource{ RamMb: ptr.Int64(int64(collectedRes.GetMemLimitMb())), }) } if collectedRes.GetDiskLimitMb() > 0 { resources = append(resources, &aurora.Resource{ DiskMb: ptr.Int64(int64(collectedRes.GetDiskLimitMb())), }) } if collectedRes.GetGpuLimit() > 0 { resources = append(resources, &aurora.Resource{ NumGpus: ptr.Int64(int64(collectedRes.GetGpuLimit())), }) } for _, port := range collectedPorts { resources = append(resources, &aurora.Resource{ NamedPort: ptr.String(port.GetName()), }) } // metadata metadata := make([]*aurora.Metadata, 0, len(podSpec.GetLabels())) for _, label := range podSpec.GetLabels() { key := label.GetKey() value := label.GetValue() // Aurora attaches "org.apache.aurora.metadata." prefix when // translating job metadata to mesos task label, reverting the // behavior here. if strings.HasPrefix(key, _auroraLabelPrefix) { key = strings.TrimPrefix(key, _auroraLabelPrefix) } metadata = append(metadata, &aurora.Metadata{ Key: ptr.String(key), Value: ptr.String(value), }) } // container docker := createDockerInfo(podSpec) dockerParams := make([]*aurora.DockerParameter, 0, len(docker.GetParameters())) for _, param := range docker.GetParameters() { dockerParams = append(dockerParams, &aurora.DockerParameter{ Name: ptr.String(param.GetKey()), Value: ptr.String(param.GetValue()), }) } container := &aurora.Container{ Docker: &aurora.DockerContainer{ Image: ptr.String(docker.GetImage()), Parameters: dockerParams, }, } // executor_data executorData, err := convertExecutorData(jobSpec, podSpec) if err != nil { return nil, errors.Wrap(err, "failed to convert pod spec to executor data") } executorDataStr, err := json.Marshal(executorData) if err != nil { return nil, errors.Wrap(err, "failed to marshal executor data") } // task_config t := &aurora.TaskConfig{ Job: &aurora.JobKey{ Role: ptr.String(jobSpec.GetName()), Environment: ptr.String(_jobEnvironment), Name: ptr.String(jobSpec.GetName() + "." + jobSpec.GetName()), }, IsService: ptr.Bool(true), Priority: ptr.Int32(int32(jobSpec.GetSla().GetPriority())), Production: ptr.Bool(false), Tier: ptr.String(tier), Resources: resources, Metadata: metadata, Container: container, ExecutorConfig: &aurora.ExecutorConfig{ Name: ptr.String("AuroraExecutor"), Data: ptr.String(string(executorDataStr)), }, // TODO: Fill MaxTaskFailures } if len(jobSpec.GetOwner()) > 0 { t.Owner = &aurora.Identity{User: ptr.String(jobSpec.GetOwner())} } return t, nil } // convertHealthCheckConfig generates HealthCheckConfig struct based on // mainContainer's livenessCheck config. func convertHealthCheckConfig( envs []*pod.Environment, livenessCheck *pod.HealthCheckSpec, ) (*HealthCheckConfig, error) { if !livenessCheck.GetEnabled() { return nil, nil } healthCheckConfig := NewHealthCheckConfig() switch livenessCheck.GetType() { case pod.HealthCheckSpec_HEALTH_CHECK_TYPE_COMMAND: healthCheckConfig.HealthChecker = NewHealthCheckerConfig() healthCheckConfig.HealthChecker.Shell = NewShellHealthChecker() healthCheckConfig.HealthChecker.Shell.ShellCommand = ptr.String(convertCmdline(envs, livenessCheck.GetCommand())) case pod.HealthCheckSpec_HEALTH_CHECK_TYPE_HTTP: var schema string if len(livenessCheck.GetHttpGet().GetScheme()) > 0 { schema = livenessCheck.GetHttpGet().GetScheme() } else { schema = "http" } endpoint := schema + "://127.0.0.1" if livenessCheck.GetHttpGet().GetPort() > 0 { endpoint += ":" + strconv.Itoa(int(livenessCheck.GetHttpGet().GetPort())) } endpoint += livenessCheck.GetHttpGet().GetPath() healthCheckConfig.HealthChecker = NewHealthCheckerConfig() healthCheckConfig.HealthChecker.Http = NewHttpHealthChecker() healthCheckConfig.HealthChecker.Http.Endpoint = ptr.String(endpoint) default: return nil, yarpcerrors.InvalidArgumentErrorf("unsupported liveness check type: %s", livenessCheck.GetType()) } if livenessCheck.GetInitialIntervalSecs() > 0 { healthCheckConfig.InitialIntervalSecs = ptr.Float64(float64(livenessCheck.GetInitialIntervalSecs())) } if livenessCheck.GetIntervalSecs() > 0 { healthCheckConfig.IntervalSecs = ptr.Float64(float64(livenessCheck.GetIntervalSecs())) } if livenessCheck.GetMaxConsecutiveFailures() > 0 { healthCheckConfig.MaxConsecutiveFailures = ptr.Int32(int32(livenessCheck.GetMaxConsecutiveFailures())) } if livenessCheck.GetSuccessThreshold() > 0 { healthCheckConfig.MinConsecutiveSuccesses = ptr.Int32(int32(livenessCheck.GetSuccessThreshold())) } if livenessCheck.GetTimeoutSecs() > 0 { healthCheckConfig.TimeoutSecs = ptr.Float64(float64(livenessCheck.GetTimeoutSecs())) } return healthCheckConfig, nil } // convertTask generates Task struct based on JobSpec, PodSpec and ResourceSpec. func convertTask(jobSpec *stateless.JobSpec, podSpec *pod.PodSpec, res *pod.ResourceSpec) *Task { task := NewTask() task.Name = ptr.String(jobSpec.GetName()) if podSpec.GetKillGracePeriodSeconds() > 0 { task.FinalizationWait = ptr.Int32(int32(podSpec.GetKillGracePeriodSeconds())) } task.Resources = NewResources() if res.GetCpuLimit() > 0 { task.Resources.Cpu = ptr.Float64(res.GetCpuLimit()) } if res.GetMemLimitMb() > 0 { task.Resources.RamBytes = ptr.Int64(int64(res.GetMemLimitMb() * MbInBytes)) } if res.GetDiskLimitMb() > 0 { task.Resources.DiskBytes = ptr.Int64(int64(res.GetDiskLimitMb() * MbInBytes)) } if res.GetGpuLimit() > 0 { task.Resources.Gpu = ptr.Int32(int32(res.GetGpuLimit())) } // Start init containers in order defined, and all init containers before // regular containers for _, c := range podSpec.GetContainers() { constraint := NewConstraint() for _, ic := range podSpec.GetInitContainers() { constraint.Order = append(constraint.Order, ptr.String(ic.GetName())) } constraint.Order = append(constraint.Order, ptr.String(c.GetName())) task.Constraints = append(task.Constraints, constraint) } // Convert ContainerSpecs to Processes for _, c := range append( podSpec.GetInitContainers(), podSpec.GetContainers()..., ) { process := NewProcess() process.Name = ptr.String(c.GetName()) process.Cmdline = ptr.String(convertCmdline(c.GetEnvironment(), c.GetEntrypoint())) task.Processes = append(task.Processes, process) } return task } // convertExecutorData generates ExecutorData struct based on Peloton PodSpec. func convertExecutorData( jobSpec *stateless.JobSpec, podSpec *pod.PodSpec, ) (*ExecutorData, error) { mainContainer := podSpec.GetContainers()[0] collectedRes, _ := collectResources(podSpec) // health_check_config healthCheckConfig, err := convertHealthCheckConfig( mainContainer.GetEnvironment(), mainContainer.GetLivenessCheck(), ) if err != nil { return nil, err } // task task := convertTask(jobSpec, podSpec, collectedRes) executorData := NewExecutorData() executorData.Role = ptr.String(jobSpec.GetName()) executorData.Environment = ptr.String(_jobEnvironment) executorData.Name = ptr.String(jobSpec.GetName() + "." + jobSpec.GetName()) executorData.Priority = ptr.Int32(int32(jobSpec.GetSla().GetPriority())) executorData.Production = ptr.Bool(false) executorData.Tier = ptr.String(convertTier(jobSpec)) executorData.CronCollisionPolicy = ptr.String("KILL_EXISTING") executorData.MaxTaskFailures = ptr.Int32(1) executorData.EnableHooks = ptr.Bool(false) executorData.HealthCheckConfig = healthCheckConfig executorData.Task = task // TODO: Fill Cluster return executorData, nil } // convertTier returns tier string based on input Peloton JobSpec. func convertTier(jobSpec *stateless.JobSpec) string { if jobSpec.GetSla().GetPreemptible() && jobSpec.GetSla().GetRevocable() { return "revocable" } return "preemptible" } // convertCmdline returns the command line string based on input Peloton // CommandSpec. It replaces variables inside command arguments with actual // values which defined in container environments, and wrap it in a single // quote. func convertCmdline( envs []*pod.Environment, command *pod.CommandSpec, ) string { envMap := map[string]string{} for _, env := range envs { envMap[env.GetName()] = env.GetValue() } mapping := expansion.MappingFuncFor(envMap) var cmd []string for _, arg := range append( []string{command.GetValue()}, command.GetArguments()..., ) { // Each cmdline arg is generated in following steps: // 1. Do kubernetes style environment variable expansion, i.e. // $(ENV) -> env-value, $$(ENV) -> $(ENV) // 2. Escape single quote characters by ending currently quoted // string append single quote, then start a newly quoted string, // i.e. ' -> '"'"' // 3. Wrap the whole argument in single quote cmd = append(cmd, "'"+strings.Replace(expansion.Expand(arg, mapping), "'", `'"'"'`, -1)+"'") } return strings.Join(cmd, " ") } // convertToData converts Peloton PodSpec to thermos executor data // encoded in thrift binary protocol. func convertToData( jobSpec *stateless.JobSpec, podSpec *pod.PodSpec, ) ([]byte, error) { taskConfig, err := convert(jobSpec, podSpec) if err != nil { return nil, err } taskConfigBytes, err := thermos.EncodeTaskConfig(taskConfig) if err != nil { return nil, err } return taskConfigBytes, nil } // mutatePodSpec mutates Peloton PodSpec to be usable by v1alpha API handler // along with generated thermos executor data. func mutatePodSpec( spec *pod.PodSpec, executorData []byte, thermosConfig config.ThermosExecutorConfig, ) error { collectedRes, collectedPorts := collectResources(spec) // Generate DockerInfo, CommandInfo and ExecutorInfo in Mesos v1 struct, // and convert them to the new fields. dockerInfo := createDockerInfo(spec) var dockerParameters []*apachemesos.PodSpec_DockerParameter for _, p := range dockerInfo.GetParameters() { dockerParameters = append(dockerParameters, &apachemesos.PodSpec_DockerParameter{ Key: p.GetKey(), Value: p.GetValue(), }) } commandInfo := thermosConfig.NewThermosCommandInfo() var uris []*apachemesos.PodSpec_URI for _, u := range commandInfo.GetUris() { uris = append(uris, &apachemesos.PodSpec_URI{ Value: u.GetValue(), Executable: u.GetExecutable(), }) } executorInfo := thermosConfig.NewThermosExecutorInfo(executorData) var executorType apachemesos.PodSpec_ExecutorSpec_ExecutorType executorResources := &apachemesos.PodSpec_ExecutorSpec_Resources{} switch executorInfo.GetType() { case mesos.ExecutorInfo_DEFAULT: executorType = apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_DEFAULT case mesos.ExecutorInfo_CUSTOM: executorType = apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_CUSTOM default: executorType = apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_INVALID } for _, r := range executorInfo.GetResources() { if r.GetType() == mesos.Value_SCALAR { if r.GetName() == "cpus" { executorResources.Cpu = r.GetScalar().GetValue() } if r.GetName() == "mem" { executorResources.MemMb = r.GetScalar().GetValue() } } } // Attach fields to PodSpec if spec.MesosSpec == nil { spec.MesosSpec = &apachemesos.PodSpec{} } spec.MesosSpec.Type = apachemesos.PodSpec_CONTAINER_TYPE_DOCKER spec.MesosSpec.DockerParameters = dockerParameters spec.MesosSpec.Uris = uris spec.MesosSpec.Shell = commandInfo.GetShell() spec.MesosSpec.ExecutorSpec = &apachemesos.PodSpec_ExecutorSpec{ Type: executorType, ExecutorId: executorInfo.GetExecutorId().GetValue(), Data: executorInfo.GetData(), Resources: executorResources, } mainContainer := spec.GetContainers()[0] mainContainer.Name = _thermosContainerName mainContainer.Image = dockerInfo.GetImage() mainContainer.Entrypoint = &pod.CommandSpec{ Value: commandInfo.GetValue(), } mainContainer.Resource = collectedRes mainContainer.Ports = collectedPorts // Health check would be performed by Thermos Executor mainContainer.LivenessCheck = nil mainContainer.ReadinessCheck = nil // Clear out deprecated fields to avoid conflict mainContainer.Command = nil mainContainer.Container = nil mainContainer.Executor = nil // Environment and Volume has been translated to DockerParameters, // clear to avoid conflict. mainContainer.Environment = nil mainContainer.VolumeMounts = nil spec.Volumes = nil // Clear out init containers, since we expect those have already been // converted to processes in executor data spec.InitContainers = nil // Attach mutated container spec spec.Containers = []*pod.ContainerSpec{mainContainer} return nil } // convertPodSpec creates a copy of srcPodSpec which gets passed in, uses // fullPodSpec (created by merging default and instance spec) to generate // executor data and mutates the copy of srcPodSpec with thermos executor // attached (along with a couple of other fields changed). func convertPodSpec( srcPodSpec *pod.PodSpec, fullPodSpec *pod.PodSpec, jobSpec *stateless.JobSpec, thermosConfig config.ThermosExecutorConfig, ) (*pod.PodSpec, error) { newSpec := proto.Clone(srcPodSpec).(*pod.PodSpec) data, err := convertToData(jobSpec, fullPodSpec) if err != nil { return nil, err } err = mutatePodSpec(newSpec, data, thermosConfig) if err != nil { return nil, err } return newSpec, nil }