benchmarks/benchmark/tools/model-load-benchmark/deployment/deployment.go (110 lines of code) (raw):

package deployment import ( "bytes" "fmt" "os" "tool/config" "google.golang.org/protobuf/proto" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/util/yaml" yamlFmt "sigs.k8s.io/yaml" ) const ( gcsFuseVolumeName string = "gcs-fuse-csi-ephemeral" ) type Deployment struct { Config *config.Config Pod *v1.Pod } // NewDeployment initializes a new Deployment func NewDeployment(cfg *config.Config) (*Deployment, error) { d := &Deployment{Config: cfg} if e := d.ParsePod(); e != nil { return nil, fmt.Errorf("failed to setup Deployment %v", e) } d.setupDeployment() return d, nil } // ParsePod parses the base pod spec to deploy func (d *Deployment) ParsePod() error { yamlData, err := os.ReadFile(d.Config.GetBasePodSpec()) if err != nil { return fmt.Errorf("failed to read YAML file: %v", err) } decoder := yaml.NewYAMLOrJSONDecoder(bytes.NewReader(yamlData), 256) pod := &v1.Pod{} err = decoder.Decode(pod) if err != nil { return fmt.Errorf("failed to decode YAML: %v", err) } d.Pod = pod return nil } func (d *Deployment) setupDeployment() { baseAnnotations := d.Pod.GetAnnotations() if baseAnnotations == nil { baseAnnotations = map[string]string{} } filteredVolumes := []v1.Volume{} for _, v := range d.Pod.Spec.Volumes { if v.Name != gcsFuseVolumeName { filteredVolumes = append(filteredVolumes, v) } } d.Pod.Spec.Volumes = append(filteredVolumes, d.getCSIVolume()) d.setAnnotations(baseAnnotations) for i := range d.Pod.Spec.Containers { filteredMounts := []v1.VolumeMount{} for _, vm := range d.Pod.Spec.Containers[i].VolumeMounts { if vm.Name != gcsFuseVolumeName { filteredMounts = append(filteredMounts, vm) } } d.Pod.Spec.Containers[i].VolumeMounts = filteredMounts d.Pod.Spec.Containers[i].VolumeMounts = append(d.Pod.Spec.Containers[i].VolumeMounts, getGcsVolMount()) } } func getGcsVolMount() v1.VolumeMount { return v1.VolumeMount{ Name: gcsFuseVolumeName, MountPath: "/data", } } func (d *Deployment) setAnnotations(curAnnotations map[string]string) map[string]string { curAnnotations["gke-gcsfuse/volumes"] = "true" sideCarAnnotations := d.Config.SideCarResources.ToMap() for k, v := range sideCarAnnotations { curAnnotations[k] = v } return curAnnotations } func (d *Deployment) getCSIVolume() v1.Volume { return v1.Volume{ Name: gcsFuseVolumeName, VolumeSource: v1.VolumeSource{ CSI: &v1.CSIVolumeSource{ Driver: "gcsfuse.csi.storage.gke.io", ReadOnly: proto.Bool(true), VolumeAttributes: d.Config.VolumeAttributes.ToMap(), }, }, } } // ToYAML Returns pod yaml func (d *Deployment) ToYAML() (string, error) { scheme := runtime.NewScheme() err := v1.AddToScheme(scheme) if err != nil { return "", fmt.Errorf("failed to register scheme: %v", err) } codecs := serializer.NewCodecFactory(scheme) encoder := codecs.LegacyCodec(v1.SchemeGroupVersion) yamlBytes, err := runtime.Encode(encoder, d.Pod) if err != nil { return "", fmt.Errorf("failed to encode deployment to YAML: %v", err) } formattedYAML, err := yamlFmt.JSONToYAML(yamlBytes) if err != nil { return "", fmt.Errorf("failed to format YAML output: %v", err) } return string(formattedYAML), nil }