modules/compute/gke-node-pool/gpu-direct-workload/scripts/enable-tcpx-in-workload.py (83 lines of code) (raw):
# Copyright 2024 "Google LLC"
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import yaml
import argparse
import os
def main():
parser = argparse.ArgumentParser(description="TCPX Job Manifest Generator")
parser.add_argument("-f", "--file", required=True, help="Path to your job template YAML file")
parser.add_argument("-r", "--rxdm", required=True, help="RxDM version")
args = parser.parse_args()
# Get the YAML file from the user
if not args.file:
args.file = input("Please provide the path to your job template YAML file: ")
# Get component versions from user
if not args.rxdm:
args.rxdm = input("Enter the RxDM version: ")
# Load and modify the YAML
with open(args.file, "r") as file:
job_manifest = yaml.load(file, Loader=yaml.BaseLoader)
# Update annotations
add_annotations(job_manifest)
# Update volumes
add_volumes(job_manifest)
# Update tolerations
add_tolerations(job_manifest)
# Add tcpx-daemon container
add_tcpx_daemon_container(job_manifest, args.rxdm)
# Update environment variables and volumeMounts for GPU containers
update_gpu_containers(job_manifest)
# Generate the new YAML file
updated_job = str(yaml.dump(job_manifest, default_flow_style=False, width=1000, default_style="|", sort_keys=False)).replace("|-", "")
new_file_name = args.file.replace(".yaml", "-tcpx.yaml")
with open(new_file_name, "w", encoding="utf-8") as file:
file.write(updated_job)
# Step 7: Provide instructions to the user
print("\nA new manifest has been generated and updated to have TCPX enabled based on the provided workload.")
print("It can be found in {path}".format(path=os.path.abspath(new_file_name)))
print("You can use the following commands to submit the sample job:")
print(" kubectl create -f {path}".format(path=os.path.abspath(new_file_name)))
def add_annotations(job_manifest):
annotations = {
'devices.gke.io/container.tcpx-daemon':"""|+
- path: /dev/nvidia0
- path: /dev/nvidia1
- path: /dev/nvidia2
- path: /dev/nvidia3
- path: /dev/nvidia4
- path: /dev/nvidia5
- path: /dev/nvidia6
- path: /dev/nvidia7
- path: /dev/nvidiactl
- path: /dev/nvidia-uvm""",
"networking.gke.io/default-interface": "eth0",
"networking.gke.io/interfaces":"""|
[
{"interfaceName":"eth0","network":"default"},
{"interfaceName":"eth1","network":"vpc1"},
{"interfaceName":"eth2","network":"vpc2"},
{"interfaceName":"eth3","network":"vpc3"},
{"interfaceName":"eth4","network":"vpc4"}
]""",
}
# Create path if it doesn't exist
job_manifest.setdefault("spec", {}).setdefault("template", {}).setdefault("metadata", {})
# Add/update annotations
pod_template_spec = job_manifest["spec"]["template"]["metadata"]
if "annotations" in pod_template_spec:
pod_template_spec["annotations"].update(annotations)
else:
pod_template_spec["annotations"] = annotations
def add_tolerations(job_manifest):
tolerations = [
{"key": "user-workload", "operator": "Equal", "value": """\"true\"""", "effect": "NoSchedule"},
]
# Create path if it doesn't exist
job_manifest.setdefault("spec", {}).setdefault("template", {}).setdefault("spec", {})
# Add tolerations
pod_spec = job_manifest["spec"]["template"]["spec"]
if "tolerations" in pod_spec:
pod_spec["tolerations"].extend(tolerations)
else:
pod_spec["tolerations"] = tolerations
def add_volumes(job_manifest):
volumes = [
{"name": "libraries", "hostPath": {"path": "/home/kubernetes/bin/nvidia/lib64"}},
{"name": "tcpx-socket", "emptyDir": {}},
{"name": "sys", "hostPath": {"path": "/sys"}},
{"name": "proc-sys", "hostPath": {"path": "/proc/sys"}},
]
# Create path if it doesn't exist
job_manifest.setdefault("spec", {}).setdefault("template", {}).setdefault("spec", {})
# Add volumes
pod_spec = job_manifest["spec"]["template"]["spec"]
if "volumes" in pod_spec:
pod_spec["volumes"].extend(volumes)
else:
pod_spec["volumes"] = volumes
def add_tcpx_daemon_container(job_template, rxdm_version):
tcpx_daemon_container = {
"name": "tcpx-daemon",
"image": f"us-docker.pkg.dev/gce-ai-infra/gpudirect-tcpx/tcpgpudmarxd-dev:{rxdm_version}", # Use provided RxDM version
"imagePullPolicy": "Always",
"command":
"""- /tcpgpudmarxd/build/app/tcpgpudmarxd
- --gpu_nic_preset
- a3vm
- --gpu_shmem_type
- fd
- --uds_path
- /run/tcpx
- --setup_param
- \\\"--verbose 128 2 0 \\\"""",
"securityContext": {
"capabilities": {"add": ["NET_ADMIN"]}
},
"volumeMounts": [
{"name": "libraries", "mountPath": "/usr/local/nvidia/lib64"},
{"name": "tcpx-socket", "mountPath": "/run/tcpx"},
{"name": "sys", "mountPath": "/hostsysfs"},
{"name": "proc-sys", "mountPath": "/hostprocsysfs"},
],
"env": [{"name": "LD_LIBRARY_PATH", "value": "/usr/local/nvidia/lib64"}],
}
# Create path if it doesn't exist
job_template.setdefault("spec", {}).setdefault("template", {}).setdefault("spec", {})
# Add container
pod_spec = job_template["spec"]["template"]["spec"]
pod_spec.setdefault("containers", []).insert(0, tcpx_daemon_container)
def update_gpu_containers(job_manifest):
env_vars = [{"name": "LD_LIBRARY_PATH", "value": "/usr/local/nvidia/lib64"}]
volume_mounts = [
{"name": "tcpx-socket", "mountPath": "/tmp"},
{"name": "libraries", "mountPath": "/usr/local/nvidia/lib64"},
]
pod_spec = job_manifest.get("spec", {}).get("template", {}).get("spec", {})
for container in pod_spec.get("containers", []):
# Create path if it doesn't exist
container.setdefault("env", [])
container.setdefault("volumeMounts", [])
if int(container.get("resources", {}).get("limits", {}).get("nvidia.com/gpu", 0)) > 0:
container["env"].extend(env_vars)
container["volumeMounts"].extend(volume_mounts)
if __name__ == "__main__":
main()