dags/map_reproducibility/utils/sample_workload_utils.py (245 lines of code) (raw):
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Workload functions for AOTC reproducibility benchmarks."""
import os
import tempfile
import subprocess
import logging
from typing import Optional, Tuple, Dict, Any
from dataclasses import dataclass
from dags.map_reproducibility.utils.common_utils import (
namespace_cmds,
internal_wait_for_jobs_cmds,
cleanup_cmds,
helm_apply_cmds_internal_run,
get_internal_pre_workload_cmds,
get_internal_pre_workload_job_name,
get_bq_writer_path,
get_cluster,
calculate_maxtext_metrics,
copy_bucket_cmds_maxtext,
get_job_gcs_bucket_folder,
parse_internal_config_filename,
parse_internal_config_content,
get_patheon_job_link,
find_xprof_gcs_path,
get_skip_steps_for_metrics_calculation,
)
from dags.map_reproducibility.utils.benchmarkdb_utils import write_run
from dags.map_reproducibility.utils.constants import Optimizer, KUEUE_NAME, NUM_STEPS
# Configure logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
PROJECT = "supercomputer-testing"
BUCKET_NAME = "regression-testing-xlml"
@dataclass
class WorkloadResult:
"""Container for workload execution results."""
mfu: float
step_time: float
gcs_bucket: str
success: bool
error_message: Optional[str] = None
def get_values_file_path(
base_recipe_repo_root: str,
config_yaml_name: str,
hypercomputer: str,
framework: str,
) -> str:
"""Determine the appropriate values file path.
Args:
base_recipe_repo_root: Root directory of the recipe repository
config_yaml_name: Name of the config YAML file
hypercomputer: Type of hypercomputer
framework: Framework name
Returns:
Path to the values file
"""
# Default values file based on hypercomputer and framework
values_name = f"{hypercomputer}_{framework}_values"
values_file_path = f"{base_recipe_repo_root}/values/{values_name}.yaml"
# Check for model-specific values file
model_specific_values_file_path = (
f"{base_recipe_repo_root}/values/{config_yaml_name}_values.yaml"
)
if os.path.exists(model_specific_values_file_path):
# Use model-specific values file
values_file_path = model_specific_values_file_path
logger.info(f"Using values file: {values_file_path}")
return values_file_path
def execute_workload_commands(commands: list, cwd: str) -> Tuple[bool, list]:
"""Execute shell commands and capture their outputs.
Args:
commands: List of shell commands to execute
cwd: Current working directory
Returns:
Tuple of (success, list of command results)
"""
logger.info(f"Executing commands: {commands} in directory: {cwd}")
# Join commands with semicolons for sequential execution
combined_command = ";".join(commands)
# Run the combined command
process = subprocess.Popen(
["bash", "-c", combined_command],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
cwd=cwd,
)
# Capture output
stdout, stderr = process.communicate()
exit_code = process.returncode
# Create result for the combined execution
command_results = [{
"command": combined_command,
"stdout": stdout,
"stderr": stderr,
"output": stdout + ("\n\nSTDERR:\n" + stderr if stderr else ""),
"exit_code": exit_code,
}]
# Log results
if stdout:
logger.info(f"Stdout for combined commands:\n{stdout}")
if stderr:
logger.warning(f"Stderr for combined commands:\n{stderr}")
if exit_code != 0:
logger.error("Command execution failed")
return exit_code == 0, command_results
def sample_job_configure_project_and_cluster(cluster: str, cluster_region: str):
set_project_command = (
f"gcloud config set project {PROJECT}",
"gcloud container clusters get-credentials "
f"{cluster} --region {cluster_region}",
)
return set_project_command
def sample_workload_gcs_to_cns_cmds(log_file_in_gcs, output_file=None):
# This function only works for glinux or cloudtop because it is using fileutil_bs
# If output_file is not provided, use the same name as the input file
log_file_in_gcs = log_file_in_gcs.removeprefix("gs://")
if not output_file:
output_file = os.path.basename(log_file_in_gcs)
print(f"output_file name is: {output_file}")
cmds = (
f"LOG_FILE_IN_GCS={log_file_in_gcs} ",
f"filename={output_file} ",
"CNS_PATH=/cns/pi-d/home/${USER}/tensorboard/multislice ",
"/google/data/ro/projects/cloud/bigstore/mpm/fileutil_bs/stable/bin/fileutil_bs cp /bigstore/${LOG_FILE_IN_GCS} ${CNS_PATH}/${filename} ",
"echo file to put into xprof: ${CNS_PATH}/${filename}",
)
return cmds
def write_run_results(
config: Any,
result: WorkloadResult,
docker_image: str,
bq_writer_repo_root: str,
job_name: str,
test_run: bool,
) -> None:
"""Write run results to the benchmark database.
Args:
config: Configuration object
result: WorkloadResult object
docker_image: Docker image name
bq_writer_repo_root: Path to BigQuery writer repository
job_name: Name of the job
test_run: Whether this is a test run
"""
write_run(
model_id=config.HELM_NAME_MODEL_ID,
hardware_id=config.HYPERCOMPUTER,
software_id=config.SOFTWARE_ID,
number_of_nodes=config.NUM_GPUS / 8,
number_of_chips=config.NUM_GPUS,
container_image_name=docker_image,
global_batch_size=config.per_device_batch_size * config.NUM_GPUS,
precision=config.PRECISION,
optimizer=Optimizer.ADAM,
seq_length=config.max_target_length,
median_step_time=result.step_time,
e2e_time=result.step_time * NUM_STEPS,
number_of_steps=NUM_STEPS,
mfu=result.mfu,
tokens_per_second=1, # Consider calculating this properly
writer_path=bq_writer_repo_root,
run_type="sample_perf_regression",
topology="",
comment="sample benchmarking run",
is_test=test_run,
logs_profile=result.gcs_bucket,
workload_others=str(config),
experiment_id=job_name,
)
def run_internal_sample_aotc_workload(
relative_config_yaml_path: str,
base_recipe_repo_root: str,
timeout: int,
image_version: str,
sample_run_bucket_name: str,
) -> Dict[str, Any]:
"""Run the internal sample AOTC workload.
Args:
relative_config_yaml_path: Relative path to config YAML
base_recipe_repo_root: Root directory of the recipe repository
test_run: Whether this is a test run
timeout: Timeout in seconds
image_version: Docker image version
Returns:
Dictionary with results
"""
# Parse config from filename
config_yaml_name = relative_config_yaml_path.rsplit("/", maxsplit=1)[
-1
].replace(".yaml", "")
config = parse_internal_config_filename(config_yaml_name)
# Get derived configuration
cluster, cluster_region = get_cluster(config.HYPERCOMPUTER)
docker_image = image_version
# Locate values file
values_file_path = get_values_file_path(
base_recipe_repo_root,
config_yaml_name,
config.HYPERCOMPUTER,
config.FRAMEWORK,
)
# Locate config yaml
full_config_yaml_path = f"{base_recipe_repo_root}/{relative_config_yaml_path}"
logger.info(f"Config YAML path: {full_config_yaml_path}")
# Parse the config content now that we have the file path
config = parse_internal_config_content(full_config_yaml_path, config=config)
job_name = get_internal_pre_workload_job_name(
model_id=config.MODEL_ID,
precision=config.PRECISION,
num_gpus=config.NUM_GPUS,
framework=config.FRAMEWORK,
cluster=config.HYPERCOMPUTER,
is_sample_run=True,
)
pantheon_link = get_patheon_job_link(
region=cluster_region, cluster_name=cluster, job_name=job_name
)
# Adjust timeout for the container
container_timeout = int(timeout) - 4
logger.info(f"Container timeout: {container_timeout}")
with tempfile.TemporaryDirectory() as tmpdir:
# Prepare commands
commands = (
sample_job_configure_project_and_cluster(cluster, cluster_region)
+ namespace_cmds()
+ get_internal_pre_workload_cmds(job_name=job_name)
+ helm_apply_cmds_internal_run(
config.FRAMEWORK,
config.HYPERCOMPUTER,
full_config_yaml_path,
base_recipe_repo_root,
values_file_path,
docker_image,
cluster_name=cluster,
kueue_name=KUEUE_NAME,
additional_cmds=f" --set workload.gpus={config.NUM_GPUS} ",
bucket_name=sample_run_bucket_name,
)
+ internal_wait_for_jobs_cmds(timeout=container_timeout)
+ copy_bucket_cmds_maxtext(tmpdir, bucket_name=sample_run_bucket_name)
+ cleanup_cmds()
)
# Execute commands
success, error_message = execute_workload_commands(commands, tmpdir)
# Process results
if success:
bq_writer_repo_root = get_bq_writer_path(tmpdir)
log_location = os.path.join(tmpdir, "tflog/metrics")
comment = "sample benchmarking run"
gcs_bucket = get_job_gcs_bucket_folder(
job_name, bucket_name=sample_run_bucket_name
)
print(f"GCS bucket is {gcs_bucket}")
logs_profile = None
if hasattr(config, "profiler"):
logs_profile = find_xprof_gcs_path(gcs_bucket)
if not logs_profile:
logger.error(f"No xprof file found in {gcs_bucket}")
else:
print(f"logs_profile is {logs_profile}")
profiler_cmds = sample_workload_gcs_to_cns_cmds(logs_profile)
profile_success, profiler_error_message = execute_workload_commands(
profiler_cmds, tmpdir
)
if not profile_success:
logger.error(
f"Profile command failed with error: {profiler_error_message}"
)
# calculate mfu based on the config
skip_first_n_steps = get_skip_steps_for_metrics_calculation(config)
mfu, step_time = calculate_maxtext_metrics(
log_location,
config.HYPERCOMPUTER,
skip_first=skip_first_n_steps,
)
print(f"mfu: {mfu}")
print(f"step_time: {step_time}")
write_run(
model_id=config.HELM_NAME_MODEL_ID,
hardware_id=config.HYPERCOMPUTER,
software_id=config.SOFTWARE_ID,
number_of_nodes=config.NUM_GPUS / 8,
number_of_chips=config.NUM_GPUS,
container_image_name=docker_image,
global_batch_size=config.per_device_batch_size * config.NUM_GPUS,
precision=config.PRECISION,
optimizer=Optimizer.ADAM,
seq_length=config.max_target_length,
median_step_time=step_time,
e2e_time=step_time * NUM_STEPS,
number_of_steps=NUM_STEPS,
mfu=mfu,
tokens_per_second=1,
writer_path=bq_writer_repo_root,
run_type="sample_helm_workload",
topology="",
comment=comment,
is_test=True,
logs_profile=logs_profile,
gcs_metrics_bucket=gcs_bucket,
workload_others=str(config),
experiment_id=job_name,
)