dags/map_reproducibility/utils/common_utils.py (838 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. "Bash helper commands for AOTC artifacts" import os import tempfile import yaml import random import string import time import subprocess import getpass import logging from airflow.decorators import task from airflow.hooks.subprocess import SubprocessHook from xlml.utils import metric from xlml.apis import metric_config from dags.map_reproducibility.utils.benchmarkdb_utils import write_run from datetime import datetime, timezone from dags import composer_env from google.cloud import storage from typing import Optional # Configure logging logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) PROJECT = "supercomputer-testing" BUCKET_NAME = "regression-testing-xlml" MAX_TFLOP = {"a3ultra": 989, "a3mega": 989, "a4": 2237} class Config: """ A simple configuration class that allows dot notation access to dictionary keys. """ def __init__(self, **kwargs): self.__dict__.update(kwargs) def __repr__(self): return repr(self.__dict__) def __str__(self): return str(self.__dict__) # This is required to get auth to access def git_cookie_authdaemon(): auth_cmds = ( "git clone https://gerrit.googlesource.com/gcompute-tools", "echo 'trying to run git-cookie-authdaemon'", # Check if the daemon is already running "if (( $(ps aux | grep git-cookie-authdaemon | grep -v -E 'airflow|grep|bash' | wc -l)>0 )) ; then " # greater than one because one would be the main job " echo 'git-cookie-authdaemon is already running' ", "else " " (./gcompute-tools/git-cookie-authdaemon >/dev/null 2>&1 &) ", # Run if not running "sleep 4", "fi", "ps aux | grep git-cookie-authdaemon | grep -v -E 'airflow|grep|bash'", ) return auth_cmds def clone_recipes_gob(): gob_clone_cmds = ( "echo 'trying to clone GoB repo from outside'", "git clone https://ai-hypercomputer-benchmarks.googlesource.com/" "reproducible-benchmark-recipes", ) return gob_clone_cmds def clone_internal_recipes_gob(): gob_clone_cmds = ( "echo 'trying to clone internal GoB repo'", "git clone https://jax3p-gpu-benchmarking.googlesource.com/" "internal-gpu-recipes", ) return gob_clone_cmds def get_bq_writer_repo(): gob_clone_cmds = ( "echo 'trying to clone GoB bq writer repo'", "git clone https://cmcs-perf-tooling-internal.googlesource.com/" "benchmark-automation", ) return gob_clone_cmds def configure_project_and_cluster(cluster: str, cluster_region: str): set_project_command = ( f"gcloud config set project {PROJECT}", "sudo chown -R airflow:airflow /home/airflow/composer_kube_config", "gcloud container clusters get-credentials " f"{cluster} --region {cluster_region}", ) return set_project_command def get_gpu_recipe_cmd(hypercomputer, model_id, framework, recipe_repo_root): gpu_recipe_cmd = ( "cd reproducible-benchmark-recipes/projects/gpu-recipes", "export RECIPE_ROOT=" f"{recipe_repo_root}/training/{hypercomputer}/{model_id}/{framework}-pretraining-gke", "cd $RECIPE_ROOT", ) return gpu_recipe_cmd def get_pre_workload_cmds(model_id, framework): prepare_workload_cmds = ( "NOW=$(date +%s)", f"export JOB_NAME=imo-team-regr-test-{model_id}-$NOW-{framework}", ) return prepare_workload_cmds def get_internal_pre_workload_cmds(job_name): prepare_workload_cmds = (f"export JOB_NAME={job_name}",) return prepare_workload_cmds def get_internal_pre_workload_job_name( model_id, precision, num_gpus, framework, cluster, is_sample_run=False ): helm_model_id = model_id.replace(".", "-") random_id = "".join(random.choices(string.ascii_lowercase, k=4)) now = int(time.time()) job_name = f"cml-{helm_model_id}-{precision}-{num_gpus}-{cluster[:3]}-{framework[:1]}-{now}-{random_id}" if is_sample_run: job_name = f"{getpass.getuser()}-{job_name}" print(f"{'*' * 20}NAME: {job_name}") return job_name def find_xprof_gcs_path(gcs_path): """ Find the .xplane.pb file in the latest date blob from the specified GCS path. Args: gcs_path (str): Full GCS path in the format gs://bucket-name/folder/path/ Returns: str: Path to the .xplane.pb file in the latest date blob """ path_without_prefix = gcs_path.removeprefix("gs://") parts = path_without_prefix.split("/", 1) bucket_name = parts[0] print(f"Bucket name: {bucket_name}") prefix = parts[1] if len(parts) > 1 else "" storage_client = storage.Client() bucket = storage_client.get_bucket(bucket_name) # List all blobs in the bucket with the given prefix print(f"Prefix: {prefix}") blobs = list(bucket.list_blobs(prefix=prefix)) # Look for .xplane.pb file in the latest directory xplane_pb_file = None for blob in blobs: if blob.name.endswith(".xplane.pb"): xplane_pb_file = blob.name break if not xplane_pb_file: print(f"No .xplane.pb file found in {gcs_path}") return None full_xplane_pb_file = f"gs://{bucket_name}/{xplane_pb_file}" print(f"Found .xplane.pb file: {full_xplane_pb_file}") return full_xplane_pb_file def get_patheon_job_link(region, cluster_name, job_name): pantheon_link = f"https://pantheon.corp.google.com/kubernetes/job/{region}/{cluster_name}/default/{job_name}" print(f"{'*' * 20}LINK: {pantheon_link}") return pantheon_link def install_helm_cmds(): install_helm_cmd = ( "curl -fsSL -o get_helm.sh " "https://raw.githubusercontent.com/helm/helm/main/scripts/get-helm-3", "chmod 700 get_helm.sh", "./get_helm.sh", ) return install_helm_cmd # By default the composer environment overwrites the # namespaces to airflow namespaces. # In order to prevent that it is necessary explicitly # change the namespace to default. def namespace_cmds(): namespace = ( "kubectl config view | grep namespace", "kubectl config set-context --current --namespace=default", "kubectl config set-context helm --namespace=default", ) return namespace def helm_apply_cmds( framework: str, hypercomputer: str, config_file, recipe_repo_root, docker_image, aotc: bool = False, cluster_name: str = "a3plus-benchmark", kueue_name: str = None, additional_cmds: str = "", num_steps: int = None, ): gcs_cmd = "" if hypercomputer in ("a3ultra", "a4"): if framework != "maxtext" and kueue_name: gcs_cmd = f" --set queue={kueue_name}" gcs_cmd += f" --set volumes.gcsMounts[0].bucketName={BUCKET_NAME}" else: gcs_cmd = f" --set workload.gcsBucketForDataCataPath={BUCKET_NAME}" if num_steps: additional_cmds += f" --set workload.steps={num_steps} " cluster_cmd = "" if framework == "nemo" and hypercomputer == "a3ultra": cluster_cmd = f" --set clusterName={cluster_name}" run_name_cmd = "" if framework == "maxtext": run_name_cmd = "--set workload.run_name=$JOB_NAME" set_aotc = "" if aotc: set_aotc = " --set-string workload.aotc=true " helm_cmds = ( " helm install -f values.yaml " "--namespace default " "--set namespace=default" f" --set-file {framework}_config" f"={config_file}" " --set workload.image" f"={docker_image} " f"{cluster_cmd} {run_name_cmd} {gcs_cmd} {set_aotc}" f"{additional_cmds}" f" $JOB_NAME {recipe_repo_root}/src/helm-charts/{hypercomputer}/{framework}-training", ) return helm_cmds def helm_apply_cmds_internal_run( framework: str, hypercomputer: str, config_file, recipe_repo_root, values_file_path, docker_image, aotc: bool = False, cluster_name: str = "a3plus-benchmark", kueue_name: str = "a3-ultra", additional_cmds: str = "", bucket_name=BUCKET_NAME, ): gcs_cmd = "" if framework == "maxtext": gcs_cmd += f" --set volumes.gcsMounts[0].bucketName={bucket_name} " if hypercomputer == "a3ultra": if framework != "maxtext": gcs_cmd += f" --set queue={kueue_name} " else: gcs_cmd += f" --set workload.gcsBucketForDataCataPath={bucket_name} " cluster_cmd = "" if framework == "nemo" and hypercomputer == "a3ultra": cluster_cmd = f" --set clusterName={cluster_name} " run_name_cmd = "" if framework == "maxtext": run_name_cmd = " --set workload.run_name=$JOB_NAME " set_aotc = "" if aotc: set_aotc = " --set-string workload.aotc=true " helm_template_path = ( f"{recipe_repo_root}/src/helm-charts/{hypercomputer}/{framework}-training" ) print(f"helm_template_path is {helm_template_path}") helm_cmds = ( f" helm install -f {values_file_path} " "--namespace default " "--set namespace=default" f" --set-file {framework}_config" f"={config_file}" " --set workload.image" f"={docker_image} " f"{cluster_cmd} {run_name_cmd} {gcs_cmd} {set_aotc}" f"{additional_cmds}" # f" $JOB_NAME {recipe_repo_root}/src/helm-charts/{hypercomputer}/{framework}-training", f" $JOB_NAME {helm_template_path}", ) print("*******helm cmd is*******") print(helm_cmds) return helm_cmds def wait_for_jobs_cmds(): wait_for_job = ( "kubectl get pods --selector=job-name=$JOB_NAME --namespace=default", "echo 'will wait for jobs to finish'", "kubectl wait --for=condition=complete " "job/$JOB_NAME --namespace=default --timeout=100m", ) return wait_for_job def internal_wait_for_jobs_cmds(timeout="100m"): timeout = str(timeout) if not timeout.endswith("m"): timeout += "m" wait_for_job = ( "kubectl describe job $JOB_NAME --namespace=default", "kubectl get pods --selector=job-name=$JOB_NAME --namespace=default", "echo 'will wait for jobs to finish'", f"kubectl wait --for=condition=complete job/$JOB_NAME --namespace=default --timeout={timeout}", "helm status $JOB_NAME --namespace=default", "kubectl describe job $JOB_NAME --namespace=default", "kubectl get pods --selector=job-name=$JOB_NAME --namespace=default", ) print("**********wait cmd is*********") print(wait_for_job) return wait_for_job def get_job_gcs_bucket_folder(job_name, bucket_name=BUCKET_NAME): """ Get the GCS bucket folder for a specific job. Args: bucket_name (str): The name of the GCS bucket job_name (str): The job name to search for Returns: str: The full path to the bucket folder containing the job """ gcs_location = f"gs://{bucket_name}/maxtext/" bucket_folder_cmd = f"gcloud storage ls {gcs_location} | grep {job_name}" print(f"bucket_folder_cmd: {bucket_folder_cmd}") try: bucket_folder = ( subprocess.check_output(bucket_folder_cmd, shell=True).decode().strip() ) bucket_folder_prefix_removed = bucket_folder.removeprefix("gs://") pantheon_bucket_link = ( "https://pantheon.corp.google.com/storage/browser/" + bucket_folder_prefix_removed ) print(f"BUCKET PANTHEON LINK: {pantheon_bucket_link}") return bucket_folder except subprocess.CalledProcessError as e: print(f"Error finding bucket folder: {e}") return None def copy_bucket_cmds_nemo(recipe_repo_root, hypercomputer: str = "a3mega"): gcs_location = "" if hypercomputer in ("a3ultra", "a4"): gcs_location = f"gs://{BUCKET_NAME}/nemo-experiments/megatron_gpt/" else: gcs_location = f"gs://{BUCKET_NAME}/nemo-experiments/" copy_bucket_contents = ( "export COMPLETE_JOB_NAME=$(gcloud storage ls " f"{gcs_location} | grep $JOB_NAME)", 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', f"cd {recipe_repo_root}/src/utils/training_metrics", "gcloud storage cp ${COMPLETE_JOB_NAME}" "dllogger/rank-0/dllogger.json .", ) return copy_bucket_contents def copy_bucket_cmds_maxtext(tmpdir, bucket_name=BUCKET_NAME): gcs_location = f"gs://{bucket_name}/maxtext/" cmds = ( f"METRICS_FILE={tmpdir}/tflog/metrics", "export BUCKET_FOLDER=$(gcloud storage ls " f"{gcs_location} | grep $JOB_NAME)", 'echo "BUCKET_FOLDER ${BUCKET_FOLDER}"', "export COMPLETE_JOB_NAME=$(gcloud storage ls " "${BUCKET_FOLDER}tensorboard/ | grep $JOB_NAME)", 'echo "COMPLETE_JOB_NAME ${COMPLETE_JOB_NAME}"', "export LOG_FILE=$(gcloud storage ls " "${COMPLETE_JOB_NAME} | grep events)", 'echo "LOG_FILE ${LOG_FILE}"', "gcloud storage cp $LOG_FILE $METRICS_FILE", ) return cmds def get_skip_steps_for_metrics_calculation(config: Config): """Extract the number of steps to skip for the profiler from config.""" # case 1: profiler not enabled # skip 2 steps, this is the default skipping since the first 2 steps' metrics are not accurate if not hasattr(config, "profiler"): return 2 # case 2: profiler enabled # skip first n steps for profiler base_skip_steps = getattr(config, "skip_first_n_steps_for_profiler", 1) # skip profiler steps also additional_skip_steps = getattr(config, "profiler_steps", 5) return base_skip_steps + additional_skip_steps def calculate_maxtext_metrics( log_location: str, hardware: str = "a3ultra", skip_first=2, skip_last=2 ): assert skip_last >= 0, "skip_last must be a non-negative integer" metrics, _ = metric.read_from_tb(log_location, None, None) print(f"metrics - {metrics}") step_time_metrics = metrics["perf/step_time_seconds"] # Calculate the sliced metrics based on skip values sliced_metrics = step_time_metrics[skip_first:-skip_last] # Check if the resulting metrics list is empty and raise an error if it is if not sliced_metrics: logger.error( f"Empty metrics list after applying skip_first={skip_first} and skip_last={skip_last}. Original metrics length: {len(step_time_metrics)}" ) # Apply skip_first and skip_last when aggregating avg_step_time = metric.aggregate_metrics( sliced_metrics, metric_config.AggregationStrategy.AVERAGE, ) tflop_per_device_per_sec_metrics = metrics["perf/per_device_tflops_per_sec"] avg_tflop_per_device_per_sec = metric.aggregate_metrics( tflop_per_device_per_sec_metrics, metric_config.AggregationStrategy.AVERAGE, ) mfu = avg_tflop_per_device_per_sec / MAX_TFLOP[hardware] return mfu, avg_step_time def get_nemo_metrics_cmds( batch_size, num_accelerators, precision, model_id, accelertator_type, temdir, two_node: bool = False, ): step_cmd = "" if two_node: step_cmd = "--start_step 0 --end_step 0 " cmds = ( f"METRICS_FILE={temdir}/metrics.txt", "python3 process_training_results.py --file" f" dllogger.json --batch_size {batch_size} " f"--num_accelerators {num_accelerators} " f"--precision {precision} " f"--model_type {model_id} " f"{step_cmd}" f"--accelerator_type {accelertator_type} | " "gsutil cp - $METRICS_FILE", ) return cmds def cleanup_all_runs_cmds(cluster, cluster_region, prefix="cml-"): cleanup_cmds = ( f"echo 'Getting credentials for cluster {cluster}...' && gcloud container clusters get-credentials {cluster} --region {cluster_region} --project {PROJECT} ", f"echo 'Uninstalling jobs with prefix {prefix}...' && JOBS=$(kubectl get job -n default | grep \"^{prefix}\" | awk '{{print $1}}') && if [ -n \"$JOBS\" ]; then echo \"$JOBS\" | xargs -L1 helm uninstall -n default; else echo 'No matching jobs found'; fi", ) return cleanup_cmds def cleanup_cmds(): cleanup = ( "kubectl config set-context --current --namespace=default ", # Attempt Helm uninstall first, continue even if it fails "helm uninstall $JOB_NAME -n default --wait || true ", # Give Helm resources time to fully clean up "echo 'Waiting 60 seconds for helm uninstall... '", "sleep 60 ", "echo 'Attempting regular job and pod deletion... '", # Track if job exists and attempt standard deletion if it does "JOB_EXISTS=false", "if kubectl get job $JOB_NAME &>/dev/null; then JOB_EXISTS=true; kubectl delete job/$JOB_NAME --grace-period=30; else echo 'Job not found, skipping regular deletion'; fi ", # Track if pods exist and attempt standard deletion if they do "PODS_EXIST=false", "if kubectl get pods -l job-name=$JOB_NAME 2>&1 | grep -q 'No resources found'; then echo 'No pods found, skipping deletion'; else PODS_EXIST=true; kubectl delete pods -l job-name=$JOB_NAME --grace-period=30; fi ", # Only wait if there were resources to delete "[ \"$JOB_EXISTS\" = true ] || [ \"$PODS_EXIST\" = true ] && { echo 'Waiting 30 seconds for kubectl graceful termination... '; sleep 30; } || echo 'No resources found, skipping wait period' ", # Attempt force deletion of job if it still exists now "if kubectl get job $JOB_NAME &>/dev/null; then echo 'Job still exists, using force deletion...'; kubectl delete job $JOB_NAME --force --grace-period=0; else echo 'No job to force delete'; fi ", # Attempt force deletion of pods if they existed before and still exist now "if ! kubectl get pods -l job-name=$JOB_NAME 2>&1 | grep -q 'No resources found'; then echo 'Pods still exist, using force deletion...'; kubectl delete pods -l job-name=$JOB_NAME --force --grace-period=0; else echo 'No pods to force delete'; fi ", "echo 'Cleanup completed'", ) print("**********cleanup cmd is*********") print(cleanup) return cleanup def get_nemo_metrics(temdir): file_content = "" with open(temdir + "/metrics.txt", "r", encoding="utf-8") as file: file_content = file.read() # Parse the metrics (adjust based on your file format) lines = file_content.splitlines() average_step_time = float(lines[0].split(": ")[1]) tflops_per_accelerator = float(lines[1].split(": ")[1]) mfu = float(lines[2].split(": ")[1]) print(f"Average Step Time: {average_step_time}") print(f"TFLOPS/Accelerator: {tflops_per_accelerator}") print(f"MFU: {mfu}") return average_step_time, mfu def get_internal_recipe_repo_path(tmpdir): recipe_repo_root = os.path.join(tmpdir, "internal-gpu-recipes") return recipe_repo_root def extract_gpus(tmpdir, yaml_file): gpus = None try: yaml_file_path = os.path.join(tmpdir, yaml_file) with open(yaml_file_path, "r", encoding="utf-8") as file: config = yaml.safe_load(file) gpus = config.get("workload", {}).get("gpus") except (FileNotFoundError, yaml.YAMLError) as e: print(f"Error: {e}") return None return gpus def extract_run_details(root, config_path): batch_size = None optimizer = None try: config_path = os.path.join(root, config_path) with open(config_path, "r", encoding="utf-8") as file: config = yaml.safe_load(file) batch_size = config.get("model", {}).get("global_batch_size") optimizer = config.get("model", {}).get("optim", {}).get("name") seq_length = config.get("model", {}).get("data", {}).get("seq_length") max_steps = config.get("trainer", {}).get("max_steps") except (FileNotFoundError, yaml.YAMLError) as e: print(f"Error: {e}") return None return batch_size, optimizer, seq_length, max_steps def get_accelerator_type(hypercomputer: str): if hypercomputer == "a3ultra": return "h200" elif hypercomputer == "a3mega": return "h100" elif hypercomputer == "a4": return "b200" def get_bq_writer_path(tempdir): return os.path.join(tempdir, "benchmark-automation/benchmark_db_writer/src") def get_recipe_repo_path(tmpdir): recipe_repo_root = os.path.join( tmpdir, "reproducible-benchmark-recipes/projects/gpu-recipes" ) return recipe_repo_root def get_cluster(hardware: str = "a3ultra"): if hardware == "a3mega": return "a3plus-benchmark", "australia-southeast1" if hardware == "a3ultra": return "gke-a3ultra-bm-map-3", "europe-west1" if hardware == "a4": return "gke-a4-shared", "us-central1" def get_scheduled_time(hardware: str, model: str, framework: str): """ Returns a cron expression for the DAG schedule based on the given hardware, model, and framework. Each model runs on Thursday on a unique time so that we have free nodes for each. The alloted time for these tests is 6 pm - 10 pm PST on Thursday. 6 PM pst - 0 2 * * 5 10 PM pst - 0 6 * * 5 Args: hardware: The hardware type (e.g., "a3ultra", "a3mega"). model: The model ID (e.g., "mixtral-8x7b", "llama-3.1-70b"). framework: The framework (e.g., "nemo", "maxtext"). Returns: A cron expression string (e.g., "0 12 * * 4") or None if no schedule is defined for the given combination. """ schedule_map = { "a3ultra": { "mixtral-8x7b": { "nemo": "0 3 * * 5", "maxtext": "0 2 * * 5", # 6 PM PST on Thursday }, "llama3-1-70b": { "nemo": "0 4 * * 5", "maxtext": "0 4 * * 5", }, "llama3-1-405b": { "nemo": "0 5 * * 5", "maxtext": "0 5 * * 5", }, }, "a3mega": { "mixtral-8x7b": { "nemo": "0 4 * * 5", "maxtext": "0 3 * * 5", }, "llama3-70b": { "nemo": "0 2 * * 5", "maxtext": "0 5 * * 5", }, "llama3-1-70b": { "nemo": "0 2 * * 5", "maxtext": "0 4 * * 5", }, "gpt3-175b": { "nemo": "0 4 * * 5", }, }, "a4": { "mixtral-8x7b": { "nemo": "0 2 * * 5", }, "llama3-1-70b": { "nemo": "0 3 * * 5", "maxtext": "0 3 * * 5", }, "llama3-1-405b": { "nemo": "0 4 * * 5", "maxtext": "0 4 * * 5", }, }, } if hardware in schedule_map: if model in schedule_map[hardware]: if framework in schedule_map[hardware][model]: return schedule_map[hardware][model][framework] return None # Return None if no schedule is found for the given combination def get_docker_image( hardware: str, framework: str, model_id: Optional[str] = None ): """ Returns the appropriate Docker image based on the given hardware,framework and model. Args: hardware: The hardware type (e.g., "a3ultra", "a3mega"). framework: The framework (e.g., "nemo", "maxtext"). model_id: The model_id. Optional. Returns: A Docker image string or None if no image is defined for the given combination. """ image_map = { "a3ultra": { "nemo": { "default": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/pytorch-gpu-nemo-nccl:nemo24.07-gib1.0.3-A3U", "llama3-1-405b": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/pytorch-gpu-nemo-nccl:nemo24.12-gib1.0.3-A3U", }, "maxtext": { "default": "us-central1-docker.pkg.dev/supercomputer-testing/gunjanjalori/maxtext-benchmark" }, }, "a3mega": { "nemo": { "default": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/pytorch-gpu-nemo:nemo24.07-A3Mega" }, "maxtext": { "default": "us-central1-docker.pkg.dev/supercomputer-testing/gunjanjalori/maxtext-benchmark" }, }, "a4": { "nemo": { "default": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/pytorch-gpu-nemo-nccl:nemo25.02-gib1.0.5-A4" }, "maxtext": { "default": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/jax-maxtext-gpu:jax0.5.1-cuda_dl25.02-rev1-maxtext-20150317" }, }, } if hardware in image_map: if framework in image_map[hardware]: if model_id: if model_id in image_map[hardware][framework]: return image_map[hardware][framework][model_id] else: return None else: return image_map[hardware][framework]["default"] return None # Return None if no image is found for the given combination def get_internal_docker_image(hardware: str, framework: str): """ Returns the appropriate Docker image based on the given hardware, model, and framework. Args: hardware: The hardware type (e.g., "a3ultra", "a3mega"). framework: The framework (e.g., "nemo", "maxtext"). Returns: A Docker image string or None if no image is defined for the given combination. """ utc_date = datetime.now(timezone.utc).strftime("%Y-%m-%d") image_map = { "a3ultra": { "nemo": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/pytorch-gpu-nemo-nccl:nemo24.07-gib1.0.3-A3U", "maxtext": f"gcr.io/tpu-prod-env-multipod/maxtext_gpu_stable_stack_nightly_jax:{utc_date}", }, "a3mega": { "nemo": "us-central1-docker.pkg.dev/deeplearning-images/reproducibility/pytorch-gpu-nemo:nemo24.07-A3Mega", "maxtext": f"gcr.io/tpu-prod-env-multipod/maxtext_gpu_stable_stack_nightly_jax:{utc_date}", }, } if hardware in image_map: if framework in image_map[hardware]: return image_map[hardware][framework] return None # Return None if no image is found for the given combination def get_two_node_cmds(hypercomputer: str = "a3ultra"): cmd = ' --set workload.arguments="{trainer.max_steps=1}" --set workload.gpus=16 ' if hypercomputer == "a3mega": cmd += '--set workload.arguments="{model.pipeline_model_parallel_size=2}"' return cmd def parse_internal_config_filename(filename, config=None): """ Parse configuration values embedded in the filename. Args: filename (str): Example: "a3ultra_llama2-7b_8gpus_fp16_maxtext_pgle.yaml" config (Config, optional): Existing Config object to update. If None, a new one is created. Returns: Config: Configuration object with dot notation access """ parts = filename.split(".yaml")[0].split("_") hypercomputer = parts[0] model_id_raw = parts[1] model_id = model_id_raw.replace("llama", "llama-") num_gpus = int(parts[2].replace("gpus", "")) precision = parts[3] framework = parts[4] is_pgle = len(parts) >= 6 and parts[5] == "pgle" software_id = f"{'jax' if framework == 'maxtext' else 'pytorch'}_{framework}" filename_config = { "MODEL_ID": model_id, "HELM_NAME_MODEL_ID": model_id_raw.replace(".", "-"), "PRECISION": precision, "HYPERCOMPUTER": hypercomputer, "FRAMEWORK": framework, "SOFTWARE_ID": software_id, "NUM_GPUS": num_gpus, "IS_PGLE": is_pgle, } if config is None: return Config(**filename_config) else: config.__dict__.update(filename_config) return config def parse_internal_config_content(yaml_path, config=None): """ Parse the internal content of a config YAML file and update the existing config. Args: yaml_path (str): Path to the YAML file config (Config, optional): Existing Config object to update. If None, a new one is created. Returns: Config: Updated configuration object with dot notation access """ try: with open(yaml_path, "r") as file: result = yaml.safe_load(file) if config is None: config = Config(**result) else: config.__dict__.update(result) print("******* configs are ********") print(config) return config except Exception as e: print(f"Unexpected error: {e}") raise e @task def run_nemo_workload( hypercomputer: str, model_id: str, framework: str, precision: str, metrics_model_id: str, num_gpus: int = None, num_steps: int = None, two_node: bool = False, kueue_name: str = None, config_model_name: str = None, ): with tempfile.TemporaryDirectory() as tmpdir: hook = SubprocessHook() result = hook.run_command( [ "bash", "-c", ";".join( git_cookie_authdaemon() + clone_recipes_gob() + get_bq_writer_repo() ), ], cwd=tmpdir, ) recipe_repo_root = get_recipe_repo_path(tmpdir) bq_writer_repo_root = get_bq_writer_path(tmpdir) value_yaml_path = f"training/{hypercomputer}/{model_id}/{framework}-pretraining-gke/values.yaml" num_gpus_file = extract_gpus(recipe_repo_root, value_yaml_path) if config_model_name: config_yaml_path = f"src/frameworks/{hypercomputer}/{framework}-configs/{config_model_name}" else: config_hardware = f"{'a3u-' if hypercomputer == 'a3ultra' else ''}" config_yaml_path = f"src/frameworks/{hypercomputer}/{framework}-configs/{model_id}-{num_gpus_file}gpus-{config_hardware}{precision}.yaml" full_config_yaml_path = os.path.join(recipe_repo_root, config_yaml_path) ( global_batch_size, optimizer, seq_length, num_steps, ) = extract_run_details(recipe_repo_root, config_yaml_path) accelerator_type = get_accelerator_type(hypercomputer) print( f"batch size: {global_batch_size}, num gpus: {num_gpus}, seq length: {seq_length}, num steps: {num_steps}" ) additional_cmds = "" if two_node == True: additional_cmds += get_two_node_cmds(hypercomputer) if num_gpus: additional_cmds += f" --set workload.gpus={num_gpus} " else: num_gpus = num_gpus_file cluster, cluster_region = get_cluster(hypercomputer) result = hook.run_command( [ "bash", "-c", ";".join( configure_project_and_cluster(cluster, cluster_region) + get_gpu_recipe_cmd( hypercomputer, model_id, framework, recipe_repo_root ) + install_helm_cmds() + namespace_cmds() + get_pre_workload_cmds(model_id, framework) + helm_apply_cmds( framework, hypercomputer, full_config_yaml_path, recipe_repo_root, get_docker_image(hypercomputer, framework, model_id), cluster_name=cluster, kueue_name=kueue_name, additional_cmds=additional_cmds, ) + wait_for_jobs_cmds() + copy_bucket_cmds_nemo( recipe_repo_root, hypercomputer=hypercomputer, ) + get_nemo_metrics_cmds( global_batch_size, num_gpus, precision, metrics_model_id, accelerator_type, tmpdir, two_node=two_node, ) + cleanup_cmds() ), ], cwd=tmpdir, ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" average_step_time, mfu = get_nemo_metrics(tmpdir) if two_node: num_gpus = 16 write_run( model_id=model_id, hardware_id=hypercomputer, software_id=get_software_id(framework), number_of_nodes=num_gpus / 8, number_of_chips=num_gpus, container_image_name=get_image_version(framework, model_id), global_batch_size=global_batch_size, precision=precision, optimizer=optimizer, seq_length=seq_length, median_step_time=average_step_time, e2e_time=0, number_of_steps=num_steps, mfu=mfu, tokens_per_second=1, writer_path=bq_writer_repo_root, topology="2X2", comment="Regression tests", is_test=(False if composer_env.is_prod_env() else True), ) @task def run_maxtext_workload( hypercomputer: str, model_id: str, framework: str, precision: str, num_steps: int, batch_size_per_device: int, kueue_name: str, optimizer: str, sequence_length: int, helm_model_id: str, num_gpus: int = None, gpu_overide: bool = True, ): with tempfile.TemporaryDirectory() as tmpdir: hook = SubprocessHook() result = hook.run_command( [ "bash", "-c", ";".join( git_cookie_authdaemon() + clone_recipes_gob() + get_bq_writer_repo() ), ], cwd=tmpdir, ) value_yaml_path = f"training/{hypercomputer}/{model_id}/{framework}-pretraining-gke/values.yaml" recipe_repo_root = get_recipe_repo_path(tmpdir) bq_writer_repo_root = get_bq_writer_path(tmpdir) num_gpus_in_file = extract_gpus(recipe_repo_root, value_yaml_path) gpu_helm_cmd = "" if num_gpus == None: num_gpus = num_gpus_in_file elif num_gpus != num_gpus_in_file: gpu_helm_cmd = f" --set workload.gpus={num_gpus} " if gpu_overide == False: num_gpus = num_gpus_in_file # This is for two node tests, they'll use the same config of more nodes config_hardware = ( f"{'a3u' if hypercomputer == 'a3ultra' else hypercomputer}" ) config_yaml_path = f"src/frameworks/{hypercomputer}/maxtext-configs/{model_id}-{num_gpus}gpus-{config_hardware}-{precision}.yaml" full_config_yaml_path = os.path.join(recipe_repo_root, config_yaml_path) cluster, cluster_region = get_cluster(hypercomputer) result = hook.run_command( [ "bash", "-c", ";".join( configure_project_and_cluster(cluster, cluster_region) + get_gpu_recipe_cmd( hypercomputer, model_id, framework, recipe_repo_root ) + install_helm_cmds() + namespace_cmds() + get_pre_workload_cmds(helm_model_id, framework) + helm_apply_cmds( framework, hypercomputer, full_config_yaml_path, recipe_repo_root, get_docker_image(hypercomputer, framework), cluster_name=cluster, kueue_name=kueue_name, additional_cmds=gpu_helm_cmd, num_steps=num_steps, ) + wait_for_jobs_cmds() + copy_bucket_cmds_maxtext(tmpdir) + cleanup_cmds() ), ], cwd=tmpdir, ) assert result.exit_code == 0, f"Command failed with code {result.exit_code}" log_location = os.path.join(tmpdir, "tflog/metrics") mfu, step_time = calculate_maxtext_metrics(log_location, hypercomputer) print(f"mfu: {mfu}") print(f"step_time: {step_time}") write_run( model_id=model_id, hardware_id=hypercomputer, software_id=get_software_id(framework), number_of_nodes=num_gpus / 8, number_of_chips=num_gpus, container_image_name=get_image_version(framework), global_batch_size=batch_size_per_device * num_gpus, precision=precision, optimizer=optimizer, seq_length=sequence_length, median_step_time=step_time, e2e_time=step_time * num_steps, number_of_steps=num_steps, mfu=mfu, tokens_per_second=-1, writer_path=bq_writer_repo_root, topology="", comment="Regression tests", is_test=(False if composer_env.is_prod_env() else True), ) def get_software_id(framework: str): if framework == "maxtext": return "jax_maxtext" elif framework == "nemo": return "pytorch_nemo" else: return None def get_image_version(framework: str, model_id: Optional[str] = None): if framework == "maxtext": return "maxtext_nightly" elif framework == "nemo": if model_id == "llama3-1-405b": return "nemo24.12-A3U" else: return "nemo24.07-A3U" else: return None