main.py (195 lines of code) (raw):
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
# Portions taken from <repo>, Copyright Nvidia Corporation
import math
import os
import random
import string
import sys
from typing import Tuple
from validations_wrapper import validate_config
LAUNCHER_SCRIPT_PATH = (
f"{os.path.dirname(os.path.abspath(__file__))}/launcher/nemo/nemo_framework_launcher/launcher_scripts/"
)
sys.path.append(LAUNCHER_SCRIPT_PATH)
import hydra
import omegaconf
from nemo_launcher.core.data_curation_stages import QualityFiltering
from nemo_launcher.core.data_stages import (
CustomDataPreparation,
MC4DataPreparation,
PileDataPreparation,
)
from nemo_launcher.core.export_stages import Export
from nemo_launcher.core.rlhf_stages import RLHFPPO, RLHFRewardModel
from nemo_launcher.core.stages import (
PEFT,
AdapterLearning,
Conversion,
EvalHarnessEvaluation,
FineTuning,
IA3Learning,
NeMoEvaluation,
PromptLearning,
)
from launcher.accelerator_devices import (
get_num_accelerator_devices,
get_num_cores_per_accelerator,
)
from launcher.nemo.recipe_stages import (
NeMoTraining,
SMTrainingGPURecipe,
SMTrainingTrainiumRecipe,
)
from launcher.nemo.stages import (
SMCustomTrainingCPU,
SMCustomTrainingGPU,
SMCustomTrainingTrainium,
get_instance_type,
)
omegaconf.OmegaConf.register_new_resolver("multiply", lambda x, y: x * y, replace=True)
omegaconf.OmegaConf.register_new_resolver("divide_ceil", lambda x, y: int(math.ceil(x / y)), replace=True)
omegaconf.OmegaConf.register_new_resolver("divide_floor", lambda x, y: int(math.floor(x / y)), replace=True)
STR2STAGECLASS = {
"training": NeMoTraining,
"fine_tuning": FineTuning,
"peft": PEFT,
"prompt_learning": PromptLearning,
"adapter_learning": AdapterLearning,
"ia3_learning": IA3Learning,
"conversion": Conversion,
"export": Export,
"evaluation": {
EvalHarnessEvaluation: ["gpt3", "prompt_gpt3", "llama", "prompt_llama"],
NeMoEvaluation: [
"t5",
"mt5",
"prompt_t5",
"prompt_mt5",
"adapter_t5",
"adapter_gpt3",
"ia3_t5",
"ia3_gpt3",
"peft_llama",
],
},
"data_preparation": {
PileDataPreparation: ["gpt3", "t5", "bert", "llama"],
MC4DataPreparation: ["mt5"],
CustomDataPreparation: ["generic"],
},
"rlhf_rm": RLHFRewardModel,
"rlhf_ppo": RLHFPPO,
"quality_filtering": QualityFiltering,
}
def get_training_stage(cfg):
"""
Get the right training stage based on the device type and if it is custom training
"""
instance_type = get_instance_type(cfg)
is_custom = cfg.get("training_cfg") is not None
# p and g instances are GPU instances
if instance_type.startswith(("p", "g")):
device_type = "gpu"
elif instance_type.startswith("trn"):
device_type = "trainium"
else:
device_type = "cpu"
if not is_custom:
if device_type == "gpu":
return SMTrainingGPURecipe
if device_type == "trainium":
return SMTrainingTrainiumRecipe
raise ValueError("Recipe only can be run on GPU or Trainium instances")
else:
if device_type == "gpu":
return SMCustomTrainingGPU
if device_type == "trainium":
return SMCustomTrainingTrainium
return SMCustomTrainingCPU
def valid_run_name(run_name) -> str:
# Generate a random 5-character alphanumeric hash
random_hash = "-" + "".join(random.choices(string.ascii_lowercase + string.digits, k=5))
# validate run name was provided
if run_name is None:
return "my-example-run" + f"{random_hash}"
# Truncate original name to align with k8s naming requirements
return run_name[:47] + f"{random_hash}"
def preprocess_config(cfg) -> Tuple[bool, bool]:
"""
Pre-process the configuration passed to the job
Returns
-------
Tuple
boolean: configuration has a custom script
boolean: is it a SageMaker recipe
"""
with omegaconf.open_dict(cfg):
cfg.launcher_scripts_path = LAUNCHER_SCRIPT_PATH
# Override the cluster type to align with NeMo
if cfg.get("cluster_type") is None:
assert cfg.get("cluster") is not None
cluster_type = cfg.cluster.cluster_type
else:
cluster_type = cfg.cluster_type
with omegaconf.open_dict(cfg):
if cluster_type == "slurm":
cfg.cluster_type = "bcm"
else:
cfg.cluster_type = cluster_type
if cfg.get("wandb_api_key_file") is None:
with omegaconf.open_dict(cfg):
cfg.wandb_api_key_file = None
if cfg.get("wandb_api_bcp_secret_key") is None:
with omegaconf.open_dict(cfg):
cfg.wandb_api_bcp_secret_key = None
if cfg.get("training_cfg") is not None:
assert cfg.get("stages") is None, "training_cfg and stages should not set together"
stage_cfg = cfg.get("training_cfg")
assert stage_cfg.get("run") is not None, "run config should be set"
run_config = stage_cfg.get("run")
if run_config.get("ntasks_per_node") is not None:
ntasks_per_node = run_config.get("ntasks_per_node")
else:
instance_type = get_instance_type(cfg)
if instance_type is not None and get_num_accelerator_devices(instance_type) is not None:
ntasks_per_node = get_num_accelerator_devices(instance_type) * get_num_cores_per_accelerator(
instance_type
)
else:
ntasks_per_node = 8
# To align with https://github.com/NVIDIA/NeMo-Framework-Launcher/blob/23.11/launcher_scripts/nemo_launcher/core/stages.py#L721
with omegaconf.open_dict(stage_cfg):
stage_cfg.trainer = {"devices": ntasks_per_node}
with omegaconf.open_dict(run_config):
run_config.ntasks_per_node = ntasks_per_node
run_config.results_dir = f"{cfg.base_results_dir}/{run_config.name}"
# To align with https://github.com/NVIDIA/NeMo-Framework-Launcher/blob/23.11/launcher_scripts/nemo_launcher/core/stages.py#L313C54-L313C72
with omegaconf.open_dict(cfg):
cfg.training = {"model": {"ub_tp_comm_overlap": False}}
# if not in a unit-test environment de-dupe consecutive runs by appending random hash to end of job name
if "pytest" not in sys.modules and "name" in cfg.training_cfg.run:
cfg.training_cfg.run.name = valid_run_name(cfg.training_cfg.run.get("name", None))
return True, False
if cfg.recipes:
model_type = cfg.recipes.run.get("model_type", None)
# if not in a unit-test environment de-dupe consecutive runs by appending random hash to end of job name
if "pytest" not in sys.modules and "name" in cfg.recipes.run:
cfg.recipes.run.name = valid_run_name(cfg.recipes.run.get("name", None))
with omegaconf.open_dict(cfg):
cfg.training = cfg.recipes # Point cfg.training to cfg.recipes to avoid conflict in nemo stages
if "hf" in model_type:
return False, True
return False, False
@hydra.main(config_path="recipes_collection", config_name="config", version_base="1.2")
@validate_config
def main(cfg):
has_custom_script, has_sm_recipe = preprocess_config(cfg)
if has_custom_script:
stage_class = get_training_stage(cfg)
stage = stage_class(cfg)
job_id = stage.run()
else:
requested_stages = cfg.get("stages") or ["training"]
dependency = None
for stage_name in requested_stages:
# Get our training stages
if stage_name == "training" and has_sm_recipe:
stage_class = get_training_stage(cfg)
else:
stage_class = STR2STAGECLASS[stage_name]
if isinstance(stage_class, dict):
stage_config_choice = cfg.get(f"{stage_name}_config")
choice_model_type = stage_config_choice.rsplit("/", 1)[0]
for cls, model_types in stage_class.items():
if choice_model_type in model_types:
stage_class = cls
break
if dependency is not None:
cfg[stage_name]["run"]["dependency"] = dependency
stage = stage_class(cfg)
job_id = stage.run()
job_path = stage.get_job_path()
command = " \\\n ".join(sys.argv)
with open(job_path.folder / "launcher_cmd.log", "w") as f:
f.write(command)
if job_id:
dependency = f"afterany:{job_id}"
if __name__ == "__main__":
main()