pkg/aurorabridge/atop/pod_spec.go (221 lines of code) (raw):

// Copyright (c) 2019 Uber Technologies, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package atop import ( "fmt" "path" "strings" "github.com/uber/peloton/.gen/peloton/api/v1alpha/peloton" "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod" "github.com/uber/peloton/.gen/peloton/api/v1alpha/pod/apachemesos" "github.com/uber/peloton/.gen/peloton/api/v1alpha/volume" "github.com/uber/peloton/.gen/thrift/aurora/api" "github.com/uber/peloton/pkg/aurorabridge/common" "github.com/uber/peloton/pkg/aurorabridge/label" "github.com/uber/peloton/pkg/common/config" "github.com/uber/peloton/pkg/common/thermos" ) // NewPodSpec creates a new PodSpec. func NewPodSpec( t *api.TaskConfig, c config.ThermosExecutorConfig, ) (*pod.PodSpec, error) { // Taking aurora TaskConfig struct from JobUpdateRequest, and // serialize it using Thrift binary protocol. The resulting // byte array will be attached to ExecutorInfo.Data. // // After task placement, jobmgr will deserialize the byte array, // combine it with placement info, and generates aurora AssignedTask // struct, which will be eventually be consumed by thermos executor. // // Leaving EncodeTaskConfig at the top since it has the side-effect // of sorting all the "list" fields. executorData, err := thermos.EncodeTaskConfig(t) if err != nil { return nil, fmt.Errorf("encode task config: %s", err) } jobKeyLabel := label.NewAuroraJobKey(t.GetJob()) metadataLabels := label.NewAuroraMetadataLabels(t.GetMetadata()) labels := append([]*peloton.Label{ common.BridgePodLabel, jobKeyLabel, }, metadataLabels...) gpuLimit, err := label.GetUdeployGpuLimit(t.GetMetadata()) if err != nil { return nil, fmt.Errorf("failed to parse udeploy gpu limit label: %s", err) } constraint, err := NewConstraint(jobKeyLabel, t.GetConstraints()) if err != nil { return nil, fmt.Errorf("new constraint: %s", err) } volumes, volumeMounts := newVolumes(t.GetContainer()) image, err := newImage(t.GetContainer()) if err != nil { return nil, err } return &pod.PodSpec{ PodName: nil, // Unused. Labels: labels, InitContainers: nil, // Unused. Containers: []*pod.ContainerSpec{{ Name: "", // Unused. Resource: newResourceSpec(t.GetResources(), gpuLimit), LivenessCheck: nil, // Unused, ReadinessCheck: nil, // Unused. Ports: newPortSpecs(t.GetResources()), Entrypoint: newEntryPoint(c), Image: image, VolumeMounts: volumeMounts, }}, Constraint: constraint, RestartPolicy: nil, // Unused. Volume: nil, // Unused. PreemptionPolicy: nil, // Unused. Controller: false, // Unused. KillGracePeriodSeconds: 0, // Unused. Revocable: t.GetTier() == common.Revocable, Volumes: volumes, MesosSpec: newMesosPodSpec(t.GetContainer(), c, executorData), }, nil } func newResourceSpec(rs []*api.Resource, gpuLimit *float64) *pod.ResourceSpec { if len(rs) == 0 { return nil } result := &pod.ResourceSpec{} for _, r := range rs { if r.IsSetNumCpus() { result.CpuLimit = r.GetNumCpus() } if r.IsSetRamMb() { result.MemLimitMb = float64(r.GetRamMb()) } if r.IsSetDiskMb() { result.DiskLimitMb = float64(r.GetDiskMb()) } if r.IsSetNumGpus() { result.GpuLimit = float64(r.GetNumGpus()) } // Note: Aurora API does not include fd_limit. } if gpuLimit != nil { result.GpuLimit = *gpuLimit } return result } func newPortSpecs(rs []*api.Resource) []*pod.PortSpec { var result []*pod.PortSpec for _, r := range rs { if r.IsSetNamedPort() { result = append(result, &pod.PortSpec{Name: r.GetNamedPort()}) } } return result } func newImage(c *api.Container) (string, error) { if c == nil { return "", nil } if c.IsSetMesos() { if !c.GetMesos().IsSetImage() { return "", nil } if c.GetMesos().GetImage().IsSetDocker() { return fmt.Sprintf("%s:%s", c.GetMesos().GetImage().GetDocker().GetName(), c.GetMesos().GetImage().GetDocker().GetTag(), ), nil } return "", fmt.Errorf("invalid mesos image type appc") } if c.IsSetDocker() { return c.GetDocker().GetImage(), nil } return "", fmt.Errorf("only docker and mesos containerizers are supported") } func newEntryPoint(c config.ThermosExecutorConfig) *pod.CommandSpec { var b strings.Builder b.WriteString("${MESOS_SANDBOX=.}/") b.WriteString(path.Base(c.Path)) b.WriteString(" ") b.WriteString(c.Flags) command := strings.TrimSpace(b.String()) result := &pod.CommandSpec{ Value: command, } return result } func newMesosPodSpec( c *api.Container, t config.ThermosExecutorConfig, executorData []byte, ) *apachemesos.PodSpec { if c == nil { return nil } result := &apachemesos.PodSpec{} if c.IsSetMesos() { result.Type = apachemesos.PodSpec_CONTAINER_TYPE_MESOS } if c.IsSetDocker() { result.Type = apachemesos.PodSpec_CONTAINER_TYPE_DOCKER result.DockerParameters = newDockerParameters( c.GetDocker().GetParameters()) result.NetworkSpec = &apachemesos.PodSpec_NetworkSpec{ Type: apachemesos.PodSpec_NetworkSpec_NETWORK_TYPE_HOST, } } // Fill the URI resourcesToFetch := []string{t.Path} if t.Resources != "" { resourcesToFetch = append( resourcesToFetch, strings.Split(t.Resources, config.ThermosExecutorDelimiter)..., ) } var mesosUris []*apachemesos.PodSpec_URI for _, r := range resourcesToFetch { mesosUris = append(mesosUris, &apachemesos.PodSpec_URI{ Value: r, Executable: true, // Previously when we were using mesos.v1.CommandInfo.URI, // "extract" field was left as unfilled, however, since in // the proto definition the field is default to true, the // actual value we passed to mesos is actually true. // // In pod.apachemesos.PodSpec.URI, "extract" field can no // longer have a different value, in order to retain // the previous behavior and to avoid unnecessary instance // restarts, we are hard-code this value to true. Extract: true, }) } result.Uris = mesosUris result.Shell = true result.ExecutorSpec = &apachemesos.PodSpec_ExecutorSpec{ Type: apachemesos.PodSpec_ExecutorSpec_EXECUTOR_TYPE_CUSTOM, ExecutorId: config.ThermosExecutorIDPlaceholder, Data: executorData, } if t.CPU > 0 || t.RAM > 0 { result.ExecutorSpec.Resources = &apachemesos.PodSpec_ExecutorSpec_Resources{ Cpu: t.CPU, MemMb: float64(t.RAM), } } return result } func newVolumes(c *api.Container) ([]*volume.VolumeSpec, []*pod.VolumeMount) { if c == nil { return nil, nil } var volumes []*volume.VolumeSpec var volumeMounts []*pod.VolumeMount vs := c.GetMesos().GetVolumes() for _, v := range vs { volume := &volume.VolumeSpec{ Name: v.GetHostPath(), Type: volume.VolumeSpec_VOLUME_TYPE_HOST_PATH, HostPath: &volume.VolumeSpec_HostPathVolumeSource{ Path: v.GetHostPath(), }, } volumeMount := &pod.VolumeMount{ Name: v.GetHostPath(), ReadOnly: isReadOnly(v.GetMode()), MountPath: v.GetContainerPath(), } volumes = append(volumes, volume) volumeMounts = append(volumeMounts, volumeMount) } return volumes, volumeMounts } func isReadOnly(mode api.Mode) bool { return mode == api.ModeRo } func newDockerParameters(ps []*api.DockerParameter) []*apachemesos.PodSpec_DockerParameter { var result []*apachemesos.PodSpec_DockerParameter for _, p := range ps { result = append(result, &apachemesos.PodSpec_DockerParameter{ Key: p.GetName(), Value: p.GetValue(), }) } return result }