chatlearn/utils/vllm_utils.py (1,102 lines of code) (raw):

# Copyright 2024 Alibaba Group Holding Limited. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== """vllm utils""" import argparse import glob import operator import os import re import random import subprocess import sys from datetime import timedelta from functools import reduce import numpy as np import torch import torch.distributed from chatlearn.models.vllm import is_vllm_v2 from chatlearn.utils.constant import CURRENT_VLLM_VERSION, VLLMVersion from chatlearn.utils.utils import get_use_legacy_models try: from chatlearn.utils.megatron_import_helper import update_num_microbatches from chatlearn.utils.megatron_import_helper import find_checkpoint_rank_0 from chatlearn.utils.megatron_import_helper import fix_query_key_value_ordering from chatlearn.utils.megatron_import_helper import get_checkpoint_tracker_filename from chatlearn.utils.megatron_import_helper import get_checkpoint_version from chatlearn.utils.megatron_import_helper import set_checkpoint_version from chatlearn.utils.megatron_import_helper import read_metadata from chatlearn.utils.megatron_import_helper import unwrap_model except ImportError: print("Cannot import megatron, please set megatron python path first.") try: from chatlearn.utils.vllm_import_helper import init_world_group except ImportError: print("Cannot import init_world_group for vLLM 0.5.1, please install vLLM 0.5.1 first.") try: from chatlearn.utils.vllm_import_helper import get_pipeline_model_parallel_rank from chatlearn.utils.vllm_import_helper import get_pipeline_model_parallel_world_size from chatlearn.utils.vllm_import_helper import _set_default_torch_dtype from chatlearn.utils.vllm_import_helper import parallel_state as mpu from chatlearn.utils.vllm_import_helper import initialize_model_parallel from chatlearn.utils.vllm_import_helper import initialize_dummy_weights except ImportError: print("Cannot import vllm, please install vllm 0.3.0 or 0.5.1 first.") try: from chatlearn.utils.vllm_import_helper import get_pp_indices except ImportError: print("Cannot import vllm, please install vllm 0.6.3 first.") from .constant import QwenVersion # The simple map of names for "automated" rules. megatron_to_transformers = { "self_attention.dense": ".self_attn.o_proj.", "mlp.dense_h_to_4h": ".mlp.gate_up_proj.", "mlp.dense_4h_to_h": ".mlp.down_proj.", "mlp.gate_up_proj": ".mlp.gate_up_proj.", "mlp.down_proj": ".mlp.down_proj.", "self_attention.rotary_emb":".self_attn.rotary_emb.inv_freq", "self_attention.query_key_value": ".self_attn.qkv_proj", "attention.query_key_value": ".self_attn.qkv_proj", } megatron_qwen_to_transformers = { "attention.attention_layernorm": ".attn.attention_layernorm.", "attention.dense": ".attn.c_proj.", "self_attention.dense": ".attn.c_proj.", "mlp.dense_h_to_4h": ".mlp.c_fc.", "mlp.w1": ".mlp.gate_up_proj.", "mlp.w2": ".mlp.gate_up_proj.", "mlp.dense_4h_to_h": ".mlp.c_proj.", "mlp.dense_layernorm": "mlp.dense_layernorm", } megatron_qwen2_to_transformers = { "attention.attention_layernorm": ".attn.attention_layernorm.", "attention.dense": ".attn.c_proj.", "self_attention.dense": ".self_attn.o_proj.", "mlp.dense_h_to_4h": ".mlp.gate_up_proj.", "mlp.w1": ".mlp.gate_up_proj.", "mlp.w2": ".mlp.gate_up_proj.", "mlp.dense_4h_to_h": ".mlp.down_proj.", "mlp.dense_layernorm": ".mlp.dense_layernorm.", "mlp.router.layer": ".mlp.gate.", "mlp.experts.dense_h_to_4h": ".mlp.experts.w13_weight", "mlp.experts.dense_4h_to_h": ".mlp.experts.w2_weight", "mlp.shared_experts.dense_h_to_4h": ".mlp.shared_expert.gate_up_proj.", "mlp.shared_experts.dense_4h_to_h": ".mlp.shared_expert.down_proj.", "mlp.gate": ".mlp.shared_expert_gate." } mcore_to_transformers = { "self_attention.linear_proj": ".self_attn.o_proj.", "mlp.linear_fc1": ".mlp.gate_up_proj.", "mlp.linear_fc2": ".mlp.down_proj.", } class ParameterSyncMap: """Base ParameterSyncMap.""" def __init__(self, src_names, layer_offset): self.weight_or_bias = ["weight", "bias"] self.src_names = src_names self.layer_offset = layer_offset self._dst_names = [] @property def embedding_sync_map(self): return self._embedding_sync_map @property def layer_sync_map(self): return self._layer_sync_map @property def final_layer_sync_map(self): return self._final_layer_sync_map @property def concat_params_dict(self): return self._concat_params_dict @property def to_fix_shared_expert_ordering(self): return self._to_fix_shared_expert_ordering @property def to_allgather_roututed_experts_dict(self): return self._to_allgather_routed_experts_dict @property def to_alltoall_roututed_experts_dict(self): return self._to_alltoall_routed_experts_dict @property def to_fix_act_ordering_dict(self): return self._to_fix_act_ordering_dict @property def to_fix_qkv_ordering_dict(self): return self._to_fix_qkv_ordering_dict @property def dst_names(self): if not self._dst_names: self.map_src_to_dst() return self._dst_names def map_src_to_dst(self): raise RuntimeError("Must be implemented by subclass.") def get_dst_name(self, sync_map, src_name): assert src_name in sync_map, f"expect {src_name} in {sync_map}" return sync_map[src_name] class Megatron2LlamaSyncMap(ParameterSyncMap): """sync map:megatron to llama transformer""" def __init__(self, src_names, layer_offset): src_prefix = "module.module.language_model" dst_prefix = "model" if is_vllm_v2() else "model.model" # The regex to extract layer names. self.layer_re = re.compile(rf"{src_prefix}.encoder.layers\.(\d+)\.([a-z0-9_.]+)\.([a-z]+)") self.src_prefix = src_prefix self.dst_prefix = dst_prefix self._embedding_sync_map = { f"{src_prefix}.embedding.word_embeddings.weight": f"{dst_prefix}.embed_tokens.weight" } self._layer_sync_map = { "self_attention.dense": ".self_attn.o_proj.", "mlp.dense_h_to_4h": ".mlp.gate_up_proj.", "mlp.dense_4h_to_h": ".mlp.down_proj.", "self_attention.rotary_emb":".self_attn.rotary_emb.inv_freq", } self._final_layer_sync_map = { f"{src_prefix}.encoder.final_norm.weight": f"{dst_prefix}.norm.weight", f"{src_prefix}.output_layer.weight": "lm_head.weight" if is_vllm_v2() else "model.lm_head.weight" } self._concat_params_dict = None self._to_fix_shared_expert_ordering = None self._to_allgather_routed_experts_dict = None self._to_alltoall_routed_experts_dict = None self._to_fix_act_ordering_dict = None self._to_fix_qkv_ordering_dict = { "modules": [ "attention.query_key_value", "self_attention.query_key_value" ], "layer_re": self.layer_re } super().__init__(src_names, layer_offset) def map_src_to_dst(self): for src_name in self.src_names: # convert word embeddings. if src_name in self.embedding_sync_map: self._dst_names.append(self.get_dst_name(self.embedding_sync_map, src_name)) continue # final layer if src_name in self.final_layer_sync_map: self._dst_names.append(self.get_dst_name(self.final_layer_sync_map, src_name)) continue m = self.layer_re.match(src_name) # Stop if that's not a layer if m is None: raise RuntimeError(f"expect src_name to be a layer, while {src_name}") # The index of the layer. layer_idx = int(m.group(1)) + self.layer_offset # The name of the operation. op_name = m.group(2) # Is it a weight or a bias? weight_or_bias = m.group(3) # The name of the layer. layer_name = f"{self.dst_prefix}.layers.{layer_idx}" # For layernorm(s), simply store the layer norm. if op_name.endswith("_norm") and weight_or_bias == 'weight': ln_name = "input_layernorm" if op_name.startswith("input") else "post_attention_layernorm" self._dst_names.append(layer_name + "." + ln_name + "." + weight_or_bias) # Transpose the QKV matrix. elif op_name in ["attention.query_key_value", "self_attention.query_key_value"] and \ weight_or_bias == "weight": self._dst_names.append(layer_name + ".self_attn.qkv_proj.weight") # Transpose the weights. elif weight_or_bias == "weight": out_name = self.get_dst_name(self.layer_sync_map, op_name) self._dst_names.append(layer_name + out_name + "weight") # Copy the bias. # Ignore them elif weight_or_bias == "bias": pass # Copy the Rotary Embedding else: out_name = self.get_dst_name(self.layer_sync_map, op_name) self._dst_names.append(layer_name + out_name) class MCore2LlamaSyncMap(ParameterSyncMap): """sync map:megatron-core to llama transformer""" def __init__(self, src_names, layer_offset): src_prefix = "module.module" dst_prefix = "model" if is_vllm_v2() else "model.model" # The regex to extract layer names. self.layer_re = re.compile(rf"{src_prefix}.decoder.layers\.(\d+)\.([a-z0-9_.]+)[\._]([a-z]+)") self.src_prefix = src_prefix self.dst_prefix = dst_prefix # vLLM skips loading rotary_pos_emb and re-initializes it. Thus, we don't synchronize it from MCore to vllm. self._embedding_sync_map = { f"{src_prefix}.embedding.word_embeddings.weight": f"{dst_prefix}.embed_tokens.weight", } self._layer_sync_map = { "self_attention.linear_proj": ".self_attn.o_proj.", "mlp.linear_fc1": ".mlp.gate_up_proj.", "mlp.linear_fc2": ".mlp.down_proj.", } self._final_layer_sync_map = { f"{src_prefix}.decoder.final_layernorm.weight": f"{dst_prefix}.norm.weight", f"{src_prefix}.output_layer.weight": "lm_head.weight" if is_vllm_v2() else "model.lm_head.weight" } self._concat_params_dict = None self._to_fix_shared_expert_ordering = None self._to_allgather_routed_experts_dict = None self._to_alltoall_routed_experts_dict = None self._to_fix_act_ordering_dict = None self._to_fix_qkv_ordering_dict = { "modules": [ "self_attention.linear_qkv", ], "layer_re": self.layer_re } super().__init__(src_names, layer_offset) def map_src_to_dst(self): for src_name in self.src_names: # convert word embeddings. if src_name in self.embedding_sync_map: self._dst_names.append(self.get_dst_name(self.embedding_sync_map, src_name)) continue # final layer if src_name in self.final_layer_sync_map: self._dst_names.append(self.get_dst_name(self.final_layer_sync_map, src_name)) continue m = self.layer_re.match(src_name) # Stop if that's not a layer if m is None: raise RuntimeError(f"expect src_name ({src_name}) to be a layer") # The index of the layer. layer_idx = int(m.group(1)) + self.layer_offset # The name of the operation. op_name = m.group(2) # Is it a weight or a bias? weight_or_bias = m.group(3) # The name of the layer. layer_name = f"{self.dst_prefix}.layers.{layer_idx}" # For layernorm(s), simply store the layer norm. if op_name.endswith("layer_norm") and weight_or_bias == 'weight': if op_name == "self_attention.linear_qkv.layer_norm": ln_name = "input_layernorm" elif op_name == "mlp.linear_fc1.layer_norm": ln_name = "post_attention_layernorm" else: assert False, f"expect op_name ({op_name}) to be layer norm" self._dst_names.append(layer_name + "." + ln_name + "." + weight_or_bias) # Transpose the QKV matrix. elif op_name == "self_attention.linear_qkv" and weight_or_bias == 'weight': self._dst_names.append(layer_name + ".self_attn.qkv_proj.weight") # Transpose the weights. elif weight_or_bias == "weight": out_name = self.get_dst_name(self.layer_sync_map, op_name) self._dst_names.append(layer_name + out_name + "weight") # Ignore biases and extra_states. elif weight_or_bias in ["bias", "_extra_state"]: pass # Copy the rest. else: out_name = self.get_dst_name(self.layer_sync_map, op_name) self._dst_names.append(layer_name + out_name) class Megatron2QWenSyncMap(ParameterSyncMap): """sync map:megatron to qwen transformer""" def __init__(self, src_names, layer_offset, qwen_version=QwenVersion.v_1): self.qwen_version = qwen_version src_prefix = "module.module.language_model" # configuration for different versions of qwen if qwen_version == QwenVersion.v_1: dst_prefix = "model.transformer" embed_name = "wte" att_dense_name = ".attn.c_proj." self.layer_prefix = "h" mlp_dense_name = ".mlp.c_proj." final_norm = "ln_f" elif qwen_version == QwenVersion.v_2: dst_prefix = "model" if is_vllm_v2() else "model.model" embed_name = "embed_tokens" att_dense_name = ".self_attn.o_proj." self.layer_prefix = "layers" mlp_dense_name = ".mlp.down_proj." final_norm = "norm" else: raise RuntimeError(f"Unsupported qwen version {qwen_version}, only 1.0 or 2.0 for now.") # The regex to extract layer names. self.layer_re = re.compile(rf"{src_prefix}.encoder.layers\.(\d+)\.([a-z0-9_.]+)\.([a-z]+)") self.src_prefix = src_prefix self.dst_prefix = dst_prefix self._embedding_sync_map = { f"{src_prefix}.embedding.word_embeddings.weight": f"{dst_prefix}.{embed_name}.weight", "module.module.word_embeddings.weight": f"{dst_prefix}.{embed_name}.weight" } self._layer_sync_map = { "attention.attention_layernorm": ".attn.attention_layernorm.", "attention.dense": ".attn.c_proj.", "self_attention.dense": att_dense_name, "mlp.dense_h_to_4h": ".mlp.gate_up_proj.", "mlp.w1": ".mlp.gate_up_proj.", "mlp.w2": ".mlp.gate_up_proj.", "mlp.dense_4h_to_h": mlp_dense_name, "mlp.dense_layernorm": "mlp.dense_layernorm", "mlp.router.layer": ".mlp.gate.", "mlp.experts.dense_h_to_4h": ".mlp.experts.w13_weight", "mlp.experts.dense_4h_to_h": ".mlp.experts.w2_weight", "mlp.shared_experts.dense_h_to_4h": ".mlp.shared_expert.gate_up_proj.", "mlp.shared_experts.dense_4h_to_h": ".mlp.shared_expert.down_proj.", "mlp.gate": ".mlp.shared_expert_gate." } self._final_layer_sync_map = { f"{src_prefix}.encoder.final_layernorm.bias": f"{dst_prefix}.{final_norm}.bias", f"{src_prefix}.encoder.final_layernorm.weight": f"{dst_prefix}.{final_norm}.weight", f"{src_prefix}.output_layer.weight": "lm_head.weight" if is_vllm_v2() else "model.lm_head.weight" } self._concat_params_dict = { "modules": ["mlp.w1", "mlp.w2"], "dim": 0 } self._to_fix_shared_expert_ordering = { "modules": ["mlp.shared_experts.dense_h_to_4h"], "dim": 0 } self._to_fix_act_ordering_dict = { "modules": ["mlp.dense_h_to_4h"], "dim": 0 } self._to_fix_qkv_ordering_dict = { "modules": [ "attention.query_key_value", "self_attention.query_key_value" ], "layer_re": self.layer_re } self._to_allgather_routed_experts_dict = { "modules": [ "mlp.experts.dense_h_to_4h", "mlp.experts.dense_4h_to_h", ], "layer_re": self.layer_re } self._to_alltoall_routed_experts_dict = { "modules": [ "mlp.experts.dense_h_to_4h", "mlp.experts.dense_4h_to_h", ], "layer_re": self.layer_re } src_names_list = [] for idx, s_name in enumerate(src_names): if "mlp.w1" in s_name: src_names_list.append(src_names[idx + 1]) src_names_list.append(s_name) elif "mlp.w2" in s_name: continue else: src_names_list.append(s_name) super().__init__(src_names_list, layer_offset) def map_src_to_dst(self): for src_name in self.src_names: # convert word embeddings. if src_name in self.embedding_sync_map: self._dst_names.append(self.get_dst_name(self.embedding_sync_map, src_name)) continue # final layer if src_name in self.final_layer_sync_map: self._dst_names.append(self.get_dst_name(self.final_layer_sync_map, src_name)) continue m = self.layer_re.match(src_name) # Stop if that's not a layer if m is None: raise RuntimeError(f"expect src_name to be a layer, while {src_name}") # The index of the layer. layer_idx = int(m.group(1)) + self.layer_offset # The name of the operation. op_name = m.group(2) # Is it a weight or a bias? weight_or_bias = m.group(3) # The name of the layer. layer_name = f"{self.dst_prefix}.{self.layer_prefix}.{layer_idx}" # For layernorm(s), simply store the layer norm. if op_name.endswith("layernorm"): if self.qwen_version == QwenVersion.v_1: if "attention." in op_name: self._dst_names.append( layer_name + self.get_dst_name(self.layer_sync_map, ".attn.attention_layernorm.") + weight_or_bias) if "mlp." in op_name: self._dst_names.append( layer_name + self.get_dst_name(self.layer_sync_map, op_name) + weight_or_bias) if op_name.startswith("input"): ln_name = "ln_1" if self.qwen_version == QwenVersion.v_1 else "input_layernorm" self._dst_names.append( layer_name + "." + ln_name + "." + weight_or_bias) elif op_name.startswith("post"): ln_name = "ln_2" if self.qwen_version == QwenVersion.v_1 else "post_attention_layernorm" self._dst_names.append( layer_name + "." + ln_name + "." + weight_or_bias) elif self.qwen_version == QwenVersion.v_2: raise RuntimeError(f"unsupport layernorm {op_name}.") elif op_name == "self_attention.rotary_emb": self._dst_names.apepnd(layer_name + ".attn.rotary_emb.inv_freq") # Transpose the QKV matrix and the bias. elif op_name in ["attention.query_key_value", "self_attention.query_key_value"]: if self.qwen_version == QwenVersion.v_1: dst_name = layer_name + f".attn.c_attn.{weight_or_bias}" else: dst_name = layer_name + f".self_attn.qkv_proj.{weight_or_bias}" self._dst_names.append(dst_name) elif op_name in ["mlp.w1", "mlp.w2"]: out_name = self.layer_sync_map[op_name] gate_up_proj_name = layer_name + out_name + "weight" if gate_up_proj_name not in self._dst_names: self._dst_names.append(gate_up_proj_name) elif op_name in ["mlp.shared_experts.dense_h_to_4h"]: out_name = self.layer_sync_map[op_name] gate_up_proj_name = layer_name + out_name + "weight" self._dst_names.append(gate_up_proj_name) elif "mlp.experts" in op_name: out_name = self.layer_sync_map[op_name] self._dst_names.append(layer_name + out_name) # Transpose the weights. elif weight_or_bias in ["weight", "bias"]: out_name = self.layer_sync_map[op_name] self._dst_names.append(layer_name + out_name + weight_or_bias) def parse_args(extra_args_provider=None, ignore_unknown_args=False): """Parse all arguments.""" parser = argparse.ArgumentParser(description='vLLM Arguments', allow_abbrev=False) # Custom arguments. if extra_args_provider is not None: parser = extra_args_provider(parser) # Parse. if ignore_unknown_args: args, _ = parser.parse_known_args() else: args = parser.parse_args() # Args from environment args.rank = int(os.getenv('RANK', '0')) args.local_rank = int(os.getenv('LOCAL_RANK', '0')) args.world_size = int(os.getenv("WORLD_SIZE", '1')) return args def get_model(model_provider, args): with _set_default_torch_dtype(args.get("params_dtype")): # Create a model instance. # The weights will be initialized as empty tensors. model = model_provider() model = model.cuda() if args["load"]: model.load_weights() else: # For accurate performance evaluation, we assign # random values to the weights. initialize_dummy_weights(model) return model.eval() def _init_distributed_environment(args): """Initialize the distributed environment.""" device_count = torch.cuda.device_count() if torch.distributed.is_initialized(): if args.rank == 0: print('torch distributed is already initialized, ' 'skipping initialization ...', flush=True) args.rank = torch.distributed.get_rank() args.world_size = torch.distributed.get_world_size() else: if args.rank == 0: print('> initializing torch distributed ...', flush=True) # Manually set the device ids. if device_count > 0: device = args.rank % device_count if args.local_rank is not None: assert args.local_rank == device, \ 'expected local-rank to be the same as rank % device-count.' else: args.local_rank = device torch.cuda.set_device(device) if torch.distributed.is_initialized(): world_size = args.tensor_model_parallel_size * args.pipeline_model_parallel_size torch_world_size = torch.distributed.get_world_size() if torch_world_size != world_size: raise RuntimeError( "torch.distributed is already initialized but the torch world " "size does not match args.world_size " f"({torch_world_size} vs. {args.world_size}).") else: torch.distributed.init_process_group( backend=args.distributed_backend, world_size=args.world_size, rank=args.rank, timeout=timedelta(minutes=args.distributed_timeout_minutes)) if CURRENT_VLLM_VERSION in [VLLMVersion.v_0_5_1, VLLMVersion.v_0_6_3, VLLMVersion.v_0_6_6]: _WORLD = None if _WORLD is None: ranks = list(range(torch.distributed.get_world_size())) _WORLD = init_world_group(ranks, args.local_rank, args.distributed_backend) else: assert _WORLD.world_size == torch.distributed.get_world_size(), ( "world group already initialized with a different world size") mpu._WORLD = _WORLD initialize_model_parallel(args.tensor_model_parallel_size, args.pipeline_model_parallel_size) def initialize_vllm( # pylint: disable=dangerous-default-value,useless-return extra_args_provider=None, ignore_unknown_args=False, allow_no_cuda=False, args_dict=None ): """Set global variables, initialize distributed, and set autoresume and random seeds. `allow_no_cuda` should not be set unless using megatron for cpu only data processing. In general this arg should not be set unless you know what you are doing. Returns a function to finalize distributed env initialization (optionally, only when args.lazy_mpu_init == True) """ if not allow_no_cuda: # Make sure cuda is available. assert torch.cuda.is_available(), "Megatron requires CUDA." # Parse arguments args = parse_args(extra_args_provider, ignore_unknown_args) if args_dict: for key, value in args_dict.items(): if value == "None": value = None if hasattr(args, key): default_value = getattr(args, key) if default_value is not None and value is not None: default_type = type(default_value) if not isinstance(value, default_type): value = default_type(value) setattr(args, key, value) def finish_mpu_init(): # Pytorch distributed. _init_distributed_environment(args) # Random seeds for reproducibility. if args.rank == 0: print("> setting random seeds to {} ...".format(args.seed)) finish_mpu_init() return args def ensure_directory_exists(filename): """Build filename's path if it does not already exists.""" dirname = os.path.dirname(filename) if not os.path.exists(dirname): os.makedirs(dirname) def get_element_from_dict_by_path(d, path): """ Get element from dictionary by path. If element is not present, recursively add empty dictionaries. Args: d (dict): the dictionary to get the element from path (list): the path to the element which is delimited by "." """ path = path.split(".") for k in path: if k not in d: d[k] = {} d = d[k] return d def split_attn_state(param, num_heads, num_query_groups, kv_channels, hidden_size): nh = num_heads ng = num_query_groups dim = kv_channels if len(param.shape) == 1: param = param.view((ng, dim*nh//ng+dim*2, 1)) q_proj = param[:, :dim*nh//ng, :].reshape(-1).contiguous() k_proj = param[:, dim*nh//ng:dim*nh//ng+dim, :].reshape(-1).contiguous() v_proj = param[:, dim*nh//ng+dim:, :].reshape(-1).contiguous() else: param = param.view((ng, dim*nh//ng+dim*2, hidden_size)) q_proj = param[:, :dim*nh//ng, :].reshape(-1, hidden_size).contiguous() k_proj = param[:, dim*nh//ng:dim*nh//ng+dim, :].reshape(-1, hidden_size).contiguous() v_proj = param[:, dim*nh//ng+dim:, :].reshape(-1, hidden_size).contiguous() return torch.concat([q_proj, k_proj, v_proj], dim=0) def fix_qwen_query_key_value_ordering( param, checkpoint_version, num_splits, num_heads, hidden_size ): # Permutes layout of param tensor to [num_splits * num_heads * hidden_size, :] # for compatibility with later versions of NVIDIA Megatron-LM. # The inverse operation is performed inside Megatron-LM to read checkpoints: # https://github.com/NVIDIA/Megatron-LM/blob/v2.4/megatron/checkpointing.py#L209 # If param is the weight tensor of the self-attention block, the returned tensor # will have to be transposed one more time to be read by HuggingFace GPT2. input_shape = param.size() if checkpoint_version == 1.0: # version 1.0 stores [num_heads * hidden_size * num_splits, :] saved_shape = (num_heads, hidden_size, num_splits) + input_shape[1:] param = param.view(*saved_shape) param = param.transpose(0, 2) param = param.transpose(1, 2).contiguous() elif checkpoint_version >= 2.0: # other versions store [num_heads * num_splits * hidden_size, :] saved_shape = (num_heads, num_splits, hidden_size) + input_shape[1:] param = param.view(*saved_shape) param = param.transpose(0, 1).contiguous() param = param.view(*input_shape) return param def get_megatron_sharded_states(args, tp_size, pp_size, pp_rank): """ Get sharded checkpoints from NVIDIA Megatron-LM checkpoint based on the provided tensor parallel size, pipeline parallel size and pipeline parallel rank. Args: args (argparse.Namespace): the arguments to the script tp_size (int): the tensor parallel size pp_size (int): the pipeline parallel size pp_rank (int): the pipeline parallel rank """ tp_state_dicts = [] for i in range(tp_size): sub_dir_name = f"mp_rank_{i:02d}" if pp_size == 1 else f"mp_rank_{i:02d}_{pp_rank:03d}" checkpoint_name = glob.glob(os.path.join(args["load"], sub_dir_name) + "/model*.pt")[0] checkpoint_path = os.path.join(args["load"], sub_dir_name, checkpoint_name) state_dict = torch.load(checkpoint_path, map_location="cpu") tp_state_dicts.append(state_dict) return tp_state_dicts def get_load_dir(load_dir, is_load_dir_valid): possible_sub_dirs = ["mp_rank_00", "mp_rank_00_000"] sub_dirs = os.listdir(load_dir) rank0_checkpoint_name = None for sub_dir in sub_dirs: if sub_dir not in possible_sub_dirs: continue sub_dir = os.path.join(load_dir, sub_dir) if not os.path.exists(sub_dir): continue rank0_checkpoint_name = glob.glob(sub_dir + "/model*.pt") if len(rank0_checkpoint_name) == 0: continue is_load_dir_valid = True break return is_load_dir_valid, rank0_checkpoint_name def load_rank0_state_dict(args): # Load rank0 state dict from Megatron-LM checkpoint. if args["load_iteration"] is not None: load_iteration = args["load_iteration"] else: tracker_filename = get_checkpoint_tracker_filename(args["load"]) if not os.path.isfile(tracker_filename): raise RuntimeError(f"Invalid load dir {args['load']}, you need to specify args.load_iteration or latest_checkpointed_iteration.txt.") load_iteration, _ = read_metadata(tracker_filename) dirnames = os.listdir(args["load"]) dirnames.sort(reverse=True) is_load_dir_valid = False load_dir = None for dirname in dirnames: if not dirname.startswith("iter_"): continue if dirname == f"iter_{load_iteration:07d}": load_dir = os.path.join(args["load"], dirname) print(f"Trying to load Megatron-LM checkpoint from {load_dir}") assert os.path.exists(load_dir), f"expect load_dir not None for load_iteration {load_iteration}, while {load_dir}." is_load_dir_valid, rank0_checkpoint_name = get_load_dir(load_dir, is_load_dir_valid) if not is_load_dir_valid: raise RuntimeError(f"Invalid load dir {args['load']} with load_iteration {load_iteration}") args["load"] = load_dir rank0_checkpoint_path = rank0_checkpoint_name[0] print(f"Loading Megatron checkpoint arguments from: {rank0_checkpoint_path}") rank0_state_dict = torch.load(rank0_checkpoint_path, map_location="cpu") return rank0_state_dict def convert_llama_state_dict_from_megatron_to_vllm(args, hf_config, qwen_version=None): """Convert NVIDIA Megatron-LM state_dict to vLLM llama state_dict. Args: args (argparse.Namespace): the arguments to the script """ assert qwen_version is None, f"Expect qwen_version is None for Llama, while {qwen_version}" tp_rank = mpu.get_tensor_model_parallel_rank() pp_rank = get_pipeline_model_parallel_rank() state_dict = load_rank0_state_dict(args) megatron_args = state_dict.get("args", None) if "checkpoint_version" in state_dict.keys(): checkpoint_version = state_dict["checkpoint_version"] else: checkpoint_version = 0.0 if megatron_args is None: raise ValueError( "Megatron-LM checkpoint does not contain arguments. This utility only supports Megatron-LM checkpoints" " containing all the megatron arguments. This is because it loads all config related to model" " architecture, the tensor and pipeline model parallel size from the checkpoint insead of user having to" " manually specify all the details. Please save Megatron-LM checkpoint along with all the megatron" " arguments to use this utility." ) output_state_dict = {} tp_size = megatron_args.tensor_model_parallel_size pp_size = megatron_args.pipeline_model_parallel_size # The number of heads. heads = hf_config.num_attention_heads // tp_size # The hidden_size per head. hidden_size_per_head = hf_config.hidden_size // hf_config.num_attention_heads # The regex to extract layer names. layer_re = re.compile(r"layers\.(\d+)\.([a-z0-9_.]+)\.([a-z]+)") # Convert. print("Start to convert...") prefix_name = "model" if is_vllm_v2() else "model.model" # Embeddings print("Converting embeddings") tp_state_dicts = get_megatron_sharded_states(args, tp_size, pp_size, pp_rank) # Convert and store the position embeddings. position_embeddings = get_element_from_dict_by_path( tp_state_dicts[0], "model.language_model.embedding.position_embeddings.weight" ) if position_embeddings: output_state_dict["transformer.position_embeddings.weight"] = position_embeddings.to(hf_config.torch_dtype) # Convert and store the word embeddings. word_embeddings = get_element_from_dict_by_path(tp_state_dicts[tp_rank], "model.word_embeddings_for_head.weight") if isinstance(word_embeddings, dict): word_embeddings = get_element_from_dict_by_path( tp_state_dicts[tp_rank], "model.language_model.embedding.word_embeddings.weight" ) if isinstance(word_embeddings, dict): assert not word_embeddings, \ "weight name of word_embed expect 'model.word_embeddings_for_head.weight' \ or 'model.language_model.embedding.word_embeddings.weight'." elif word_embeddings is not None: # After training with megatron, word_embeddings is stored differently word_embeddings = word_embeddings.to(hf_config.torch_dtype) output_state_dict[f"{prefix_name}.embed_tokens.weight"] = word_embeddings # Reset the vocab size hf_config.vocab_size = word_embeddings.shape[0] # Transformer Layers print("Converting transformer layers") if CURRENT_VLLM_VERSION in [VLLMVersion.v_0_6_3, VLLMVersion.v_0_6_6]: start_layer_idx, _ = get_pp_indices( hf_config.num_hidden_layers, pp_rank, pp_size ) layer_offset = start_layer_idx else: assert pp_size == 1, f"expect pipeline parallel size eq 1 for vllm {CURRENT_VLLM_VERSION}" layer_offset = hf_config.num_hidden_layers // pp_size * pp_rank # The transformer. path = ( "model.language_model.transformer" if "transformer" in get_element_from_dict_by_path(tp_state_dicts[0], "model.language_model").keys() else "model.language_model.encoder" ) # Extract the layers. for key, val in get_element_from_dict_by_path(tp_state_dicts[tp_rank], path).items(): # skip None value. # TODO(jiangle.jl): whether to process empty value. if val is None: continue # Match the name. m = layer_re.match(key) # Stop if that's not a layer if m is None: break # The index of the layer. layer_idx = int(m.group(1)) + layer_offset # The name of the operation. op_name = m.group(2) # Is it a weight or a bias? weight_or_bias = m.group(3) # The name of the layer. layer_name = f"{prefix_name}.layers.{layer_idx}" params = val.to(hf_config.torch_dtype) # For layernorm(s), simply store the layer norm. if (op_name.endswith("_norm") or op_name.endswith("_layernorm")) and weight_or_bias == 'weight': ln_name = "input_layernorm" if op_name.startswith("input") else "post_attention_layernorm" output_state_dict[layer_name + "." + ln_name + "." + weight_or_bias] = params # Transpose the QKV matrix. elif op_name in ["attention.query_key_value", "self_attention.query_key_value"] and weight_or_bias == "weight": input_shape = params.size() shape = (heads, hidden_size_per_head, 3) + input_shape[1:] division = reduce(operator.mul, shape, 1) num_elements = params.numel() if num_elements != division: # model with gqa dont need to fix qkv ordering. output_state_dict[layer_name + ".self_attn.qkv_proj.weight"] = params else: out_val = fix_qwen_query_key_value_ordering( params, checkpoint_version, 3, heads, hidden_size_per_head ) # Megatron stores (3*D) x D but transformers-GPT2 expects D x 3*D. out_val = out_val.contiguous() # Store. output_state_dict[layer_name + ".self_attn.qkv_proj.weight"] = out_val # Transpose the bias. elif op_name in ["attention.query_key_value", "self_attention.query_key_value"] and weight_or_bias == "bias": out_val = fix_qwen_query_key_value_ordering( params, checkpoint_version, 3, heads, hidden_size_per_head ) # Store. No change of shape. output_state_dict[layer_name + ".self_attn.qkv_proj.bias"] = out_val # Transpose the weights. elif weight_or_bias == "weight": out_name = megatron_to_transformers[op_name] output_state_dict[layer_name + out_name + "weight"] = params # Copy the bias. # Ignore them elif weight_or_bias == "bias": pass # Copy the Rotary Embedding else: out_name = megatron_to_transformers[op_name] output_state_dict[layer_name + out_name] = params # The final layernorm. params = get_element_from_dict_by_path(tp_state_dicts[tp_rank], str(path)) if "final_norm.weight" in params or "final_layernorm.weight" in params: print("Converting final layernorm") final_norm_weight = params["final_norm.weight"] if "final_norm.weight" in params else params["final_layernorm.weight"] output_state_dict[f"{prefix_name}.norm.weight"] = final_norm_weight.to(hf_config.torch_dtype) # For LM head, transformers' wants the matrix to weight embeddings. params = get_element_from_dict_by_path(tp_state_dicts[tp_rank], 'model.language_model.output_layer.weight') if isinstance(params, dict): assert not params, "weight name of lm_head expect 'model.language_model.output_layer.weight'." elif params is not None: print("Converting LM head") output_state_dict["lm_head.weight" if is_vllm_v2() else "model.lm_head.weight"] = params.to(hf_config.torch_dtype) # It should be done! print("Conversion from Megatron-LM to Transformers is done!") return output_state_dict def convert_llama_state_dict_from_mcore_to_vllm(args, hf_config, qwen_version=None): """Convert NVIDIA Megatron-Core state_dict to vLLM llama state_dict. Args: args (argparse.Namespace): the arguments to the script """ assert qwen_version is None, f"Expect qwen_version is None for Llama, while {qwen_version}" tp_rank = mpu.get_tensor_model_parallel_rank() pp_rank = get_pipeline_model_parallel_rank() assert pp_rank == 0, "pipeline parallelism for mcore inference not supported for now." state_dict = load_rank0_state_dict(args) megatron_args = state_dict.get("args", None) if "checkpoint_version" in state_dict.keys(): checkpoint_version = state_dict["checkpoint_version"] else: checkpoint_version = 0.0 if megatron_args is None: raise ValueError( "Megatron-LM checkpoint does not contain arguments. This utility only supports Megatron-LM checkpoints" " containing all the megatron arguments. This is because it loads all config related to model" " architecture, the tensor and pipeline model parallel size from the checkpoint insead of user having to" " manually specify all the details. Please save Megatron-LM checkpoint along with all the megatron" " arguments to use this utility." ) output_state_dict = {} tp_size = megatron_args.tensor_model_parallel_size pp_size = megatron_args.pipeline_model_parallel_size assert pp_size == 1, "pipeline parallelism for mcore inference not supported for now." # The number of heads. heads = hf_config.num_attention_heads // tp_size # The hidden_size per head. hidden_size_per_head = hf_config.hidden_size // hf_config.num_attention_heads # The regex to extract layer names. layer_re = re.compile(r"decoder.layers\.(\d+)\.([a-z0-9_.]+)[\._]([a-z]+)") # Convert. print("Start to convert...") prefix_name = "model" if is_vllm_v2() else "model.model" # Embeddings print("Converting embeddings") tp_state_dicts = get_megatron_sharded_states(args, tp_size, pp_size, 0) # tp_state_dicts: list of state dict for each tp rank # tp_state_dicts[0]: a state dict for tp rank 0 # |-keys: dict_keys(['args', 'checkpoint_version', 'iteration', 'model', ...]) # |-tp_state_dicts[0]['model'] # |-keys: ['embedding.word_embeddings.weight', # 'decoder.layers.0.self_attention.core_attention.fused_attention._extra_state', # 'decoder.layers.0.self_attention.linear_proj.weight', # 'decoder.layers.0.self_attention.linear_proj._extra_state', # 'decoder.layers.0.self_attention.linear_qkv.layer_norm_weight', # 'decoder.layers.0.self_attention.linear_qkv.weight', # 'decoder.layers.0.self_attention.linear_qkv._extra_state', # 'decoder.layers.0.mlp.linear_fc1.layer_norm_weight', # 'decoder.layers.0.mlp.linear_fc1.weight', # 'decoder.layers.0.mlp.linear_fc1._extra_state', # 'decoder.layers.0.mlp.linear_fc2.weight', # 'decoder.layers.0.mlp.linear_fc2._extra_state', # ... # 'decoder.final_layernorm.weight', # 'output_layer.weight', # 'output_layer._extra_state' # Convert and store the position embeddings. position_embeddings = tp_state_dicts[0]['model'].get("embedding.position_embeddings.weight", None) if position_embeddings: output_state_dict["transformer.position_embeddings.weight"] = position_embeddings.to(hf_config.torch_dtype) # Convert and store the word embeddings. word_embeddings = tp_state_dicts[tp_rank]['model'].get("embedding.word_embeddings.weight", None) word_embeddings = word_embeddings.to(hf_config.torch_dtype) output_state_dict[f"{prefix_name}.embed_tokens.weight"] = word_embeddings # Reset the vocab size hf_config.vocab_size = word_embeddings.shape[0] # Transformer Layers print("Converting transformer layers") for key, val in tp_state_dicts[tp_rank]['model'].items(): if val is None: assert 'extra_state' in key, "weight/bias shouldn't be None except for extra_state in mcore" continue if "_extra_state" in key: continue # Match the name layer_match_res = layer_re.match(key) # Skip if that's not a layer if layer_match_res is None: continue # The index of the layer layer_idx = int(layer_match_res.group(1)) # The name of the operation. op_name = layer_match_res.group(2) # Is it a weight or a bias? weight_or_bias = layer_match_res.group(3) # The name of the layer layer_name = f"{prefix_name}.layers.{layer_idx}" params = val.to(hf_config.torch_dtype) # For layernorm(s), simply store the layer norm. if op_name.endswith("layer_norm") and weight_or_bias == 'weight': if op_name == "self_attention.linear_qkv.layer_norm": ln_name = "input_layernorm" elif op_name == "mlp.linear_fc1.layer_norm": ln_name = "post_attention_layernorm" else: assert False, f"Unrecognized op_name {op_name} for layer norm" output_state_dict[layer_name + "." + ln_name + "." + weight_or_bias] = params # Transpose the QKV matrix. elif op_name == "self_attention.linear_qkv" and weight_or_bias == 'weight': input_shape = params.size() shape = (heads, hidden_size_per_head, 3) + input_shape[1:] division = reduce(operator.mul, shape, 1) num_elements = params.numel() if num_elements != division: # model with gqa dont need to fix qkv ordering. output_state_dict[layer_name + ".self_attn.qkv_proj.weight"] = params else: out_val = fix_qwen_query_key_value_ordering( params, checkpoint_version, 3, heads, hidden_size_per_head ) # Megatron stores (3*D) x D but transformers-GPT2 expects D x 3*D. out_val = out_val.contiguous() # Store. output_state_dict[layer_name + ".self_attn.qkv_proj.weight"] = out_val # Transpose the bias. elif op_name == "self_attention.linear_qkv" and weight_or_bias == "bias": out_val = fix_qwen_query_key_value_ordering( params, checkpoint_version, 3, heads, hidden_size_per_head ) # Store. No change of shape. output_state_dict[layer_name + ".self_attn.qkv_proj.bias"] = out_val # Transpose the weights. elif weight_or_bias == "weight": out_name = mcore_to_transformers[op_name] output_state_dict[layer_name + out_name + "weight"] = params # Copy the bias. # Ignore them elif weight_or_bias == "bias": pass # Copy the Rotary Embedding else: out_name = mcore_to_transformers[op_name] output_state_dict[layer_name + out_name] = params if hf_config.num_hidden_layers != (layer_idx + 1): raise ValueError(f"Expected {hf_config.num_hidden_layers} layers but found {layer_idx + 1}") # The final layernorm. print("Converting final layernorm") final_norm_weight = tp_state_dicts[0]['model'].get("decoder.final_layernorm.weight", None) output_state_dict[f"{prefix_name}.norm.weight"] = final_norm_weight.to(hf_config.torch_dtype) # For LM head, transformers' wants the matrix to weight embeddings. print("Converting LM head") params = tp_state_dicts[tp_rank]['model'].get('output_layer.weight', None) output_state_dict["lm_head.weight" if is_vllm_v2() else "model.lm_head.weight"] = params.to(hf_config.torch_dtype) # It should be done! print("Conversion from Megatron-Core to Transformers is done!") return output_state_dict def convert_qwen_state_dict_from_megatron_to_vllm(args, hf_config, qwen_version=QwenVersion.v_1): # The converted output model. output_state_dict = {} # configuration for different versions of qwen if qwen_version == QwenVersion.v_1: prefix_name = "model.transformer." embed_name = "wte" layer_prefix = "h" final_norm = "ln_f" func_map = megatron_qwen_to_transformers elif qwen_version == QwenVersion.v_2: prefix_name = "model." if is_vllm_v2() else "model.model." embed_name = "embed_tokens" layer_prefix = "layers" final_norm = "norm" func_map = megatron_qwen2_to_transformers else: raise RuntimeError(f"Unsupported qwen version {qwen_version}, only 1.0 or 2.0 for now. while {qwen_version}.") tp_rank = mpu.get_tensor_model_parallel_rank() pp_rank = get_pipeline_model_parallel_rank() state_dict = load_rank0_state_dict(args) megatron_args = state_dict.get("args", None) if "checkpoint_version" in state_dict.keys(): checkpoint_version = state_dict["checkpoint_version"] else: checkpoint_version = 0.0 if megatron_args is None: raise ValueError( "Megatron-LM checkpoint does not contain arguments. This utility only supports Megatron-LM checkpoints" " containing all the megatron arguments. This is because it loads all config related to model" " architecture, the tensor and pipeline model parallel size from the checkpoint insead of user having to" " manually specify all the details. Please save Megatron-LM checkpoint along with all the megatron" " arguments to use this utility." ) tp_size = megatron_args.tensor_model_parallel_size pp_size = megatron_args.pipeline_model_parallel_size if hasattr(megatron_args, "moe_expert_model_parallel_size"): ep_size = megatron_args.moe_expert_model_parallel_size hep_size = tp_size * ep_size else: ep_size = 1 hep_size = tp_size # The number of heads. heads = hf_config.num_attention_heads // tp_size # The hidden_size per head. hidden_size_per_head = hf_config.hidden_size // hf_config.num_attention_heads # The regex to extract layer names. layer_re = re.compile(r"layers\.(\d+)\.([a-z0-9_.]+)\.([a-z]+)") # Convert. print("Start to convert...") # Embeddings print("Converting embeddings") tp_state_dicts = get_megatron_sharded_states(args, tp_size, pp_size, pp_rank) # Convert and store the word embeddings. if pp_rank == 0 or (pp_rank == pp_size - 1 and not megatron_args.untie_embeddings_and_output_weights) or \ (hasattr(megatron_args, "moe_num_experts") and megatron_args.moe_num_experts): embed_state_dict = tp_state_dicts if pp_rank == 0 else get_megatron_sharded_states(args, tp_size, pp_size, 0) word_embeddings = get_element_from_dict_by_path( embed_state_dict[tp_rank], "model.language_model.embedding.word_embeddings.weight" ) if isinstance(word_embeddings, dict): assert not word_embeddings, \ "weight name of word_embed expect 'model.word_embeddings_for_head.weight' \ or 'model.language_model.embedding.word_embeddings.weight'." elif word_embeddings is not None: # After training with megatron, word_embeddings is stored differently word_embeddings = word_embeddings.to(hf_config.torch_dtype) word_embeddings = word_embeddings[: hf_config.vocab_size, :] output_state_dict[f"{prefix_name}{embed_name}.weight"] = word_embeddings # Reset the vocab size hf_config.vocab_size = word_embeddings.shape[0] # Transformer Layers print("Converting transformer layers") if CURRENT_VLLM_VERSION in [VLLMVersion.v_0_6_3, VLLMVersion.v_0_6_6]: start_layer_idx, _ = get_pp_indices( hf_config.num_hidden_layers, pp_rank, pp_size ) layer_offset = start_layer_idx else: assert pp_size == 1, f"expect pipeline parallel size eq 1 for vllm {CURRENT_VLLM_VERSION}" layer_offset = hf_config.num_hidden_layers // pp_size * pp_rank # The transformer. path = ( "model.language_model.transformer" if "transformer" in get_element_from_dict_by_path(tp_state_dicts[0], "model.language_model").keys() else "model.language_model.encoder" ) # Extract the layers. gate_up_proj = {} for key, val in get_element_from_dict_by_path(tp_state_dicts[tp_rank], path).items(): # skip None value. # TODO(jiangle.jl): whether to process empty value. if val is None: continue # Match the name. m = layer_re.match(key) # Stop if that's not a layer if m is None: continue # The index of the layer. layer_idx = int(m.group(1)) + layer_offset # The name of the operation. op_name = m.group(2) # Is it a weight or a bias? weight_or_bias = m.group(3) # The name of the layer. layer_name = f"{prefix_name}{layer_prefix}.{layer_idx}" params = val.to(hf_config.torch_dtype) # For layernorm(s), simply store the layer norm. if op_name.endswith("layernorm"): if qwen_version == QwenVersion.v_1: if "attention." in op_name: output_state_dict[ layer_name + ".attn.attention_layernorm." + weight_or_bias ] = params if "mlp." in op_name: output_state_dict[ layer_name + "." + op_name + "." + weight_or_bias ] = params if op_name.startswith("input"): ln_name = "ln_1" if qwen_version == QwenVersion.v_1 else "input_layernorm" output_state_dict[ layer_name + "." + ln_name + "." + weight_or_bias ] = params elif op_name.startswith("post"): ln_name = "ln_2" if qwen_version == QwenVersion.v_1 else "post_attention_layernorm" output_state_dict[ layer_name + "." + ln_name + "." + weight_or_bias ] = params elif qwen_version == QwenVersion.v_2: raise RuntimeError(f"unsupport layernorm {op_name}.") elif op_name == "self_attention.rotary_emb": output_state_dict[layer_name + ".attn.rotary_emb.inv_freq"] = params # Transpose the QKV matrix and the bias. elif op_name in ["attention.query_key_value", "self_attention.query_key_value"]: if qwen_version == QwenVersion.v_1: out_val = fix_qwen_query_key_value_ordering( params, checkpoint_version, 3, heads, hidden_size_per_head ) # Megatron stores (3*D) x D but transformers-GPT2 expects D x 3*D. if len(list(out_val.shape)) > 1: out_val = out_val.contiguous() # Store. output_state_dict[layer_name + f".attn.c_attn.{weight_or_bias}"] = out_val else: num_query_groups = megatron_args.num_query_groups if megatron_args.group_query_attention else megatron_args.num_attention_heads params = split_attn_state(params, heads, num_query_groups // tp_size, hidden_size_per_head, hf_config.hidden_size) # Store. No change of shape. output_state_dict[layer_name + f".self_attn.qkv_proj.{weight_or_bias}"] = params elif op_name in ["mlp.dense_h_to_4h"]: offset = params.shape[0] // 2 w1 = params[:offset,:] w2 = params[offset:,:] out_name = func_map[op_name] out_name = layer_name + out_name + "weight" output_state_dict[out_name] = torch.cat([w2, w1], dim=0) elif op_name in ["mlp.w1", "mlp.w2"]: gate_up_proj[op_name] = params if len(gate_up_proj) == 2: gate_up_proj = [gate_up_proj["mlp.w2"], gate_up_proj["mlp.w1"]] out_name = func_map[op_name] gate_up_proj_name = layer_name + out_name + "weight" output_state_dict[gate_up_proj_name] = torch.cat(gate_up_proj, dim=0) gate_up_proj = {} elif op_name in ["mlp.shared_experts.dense_h_to_4h"]: out_name = func_map[op_name] gate_up_proj_name = layer_name + out_name + "weight" w1, w2 = params.chunk(2, dim=0) output_state_dict[gate_up_proj_name] = torch.cat([w2, w1], dim=0).contiguous() elif "mlp.experts" in op_name: # For w13_weight and w2_weight, each tp slice contains part of expert weights. # qwen w13_weight when tp = 4 (pp=1,ep=1): # rank 0: [[0.1, 0.2], [0.3, 0.4]] # rank 1: [[1.1, 1.2], [1.3, 1.4]] # rank 2: [[2.1, 2.2], [2.3, 2.4]] # rank 3: [[3.1, 3.2], [3.3, 3.4]] # vLLM w13_weight when tp = 4 (pp=1,ep=1): # rank 0: [[0.1, 1.1], [2.1, 3.1]] # rank 1: [[0.2, 1.2], [2.2, 3.2]] # rank 2: [[0.3, 1.3], [2.3, 3.3]] # rank 3: [[0.4, 1.4], [2.4, 3.4]] # w2_weight as well. out_name = func_map[op_name] moe_num_experts = megatron_args.moe_num_experts local_num_experts = moe_num_experts // hep_size if "dense_h_to_4h" in op_name: params_list = [] for rank in range(tp_size): if rank != tp_rank: params = get_element_from_dict_by_path(tp_state_dicts[rank], path)[key] params_list.append(params) val_list = [] for params in params_list: params = params.view((moe_num_experts, -1, hf_config.hidden_size)).contiguous() params = params.reshape((local_num_experts * 2, -1, hf_config.hidden_size)) params = params.chunk(tp_size, dim=1)[tp_rank] params = params.reshape(params.shape[0] // 2, -1, hf_config.hidden_size) params_right, params_left = params.chunk(2, dim=1) params = torch.cat([params_left, params_right], dim=1).contiguous() val_list.append(params) val = torch.cat(val_list, dim=0).contiguous() elif "dense_4h_to_h" in op_name: params_list = [] for rank in range(tp_size): if rank != tp_rank: params = get_element_from_dict_by_path(tp_state_dicts[rank], path)[key] params = params.view((moe_num_experts, -1, hf_config.hidden_size)).contiguous() params_list.append(params) val_list = [] for params in params_list: params = params.reshape((local_num_experts, -1, hf_config.hidden_size)) params = params.chunk(tp_size, dim=1)[tp_rank] val_list.append(params) val = torch.cat(val_list, dim=0).transpose(1, 2).contiguous() else: raise RuntimeError(f"only support routed weight name 'dense_h_to_4h' or 'dense_4h_to_h' for qwen2_moe. while {op_name}.") output_state_dict[layer_name + out_name] = val # Transpose the weights. elif weight_or_bias == "weight": out_name = func_map[op_name] output_state_dict[layer_name + out_name + "weight"] = params # Copy the bias. elif weight_or_bias == "bias": out_name = func_map[op_name] output_state_dict[layer_name + out_name + "bias"] = params # The final layernorm. if hasattr(megatron_args, "moe_num_experts") and megatron_args.moe_num_experts: final_state_dicts = get_megatron_sharded_states(args, tp_size, pp_size, pp_size - 1) params = get_element_from_dict_by_path(final_state_dicts[tp_rank], str(path)) else: params = get_element_from_dict_by_path(tp_state_dicts[tp_rank], str(path)) if "final_norm.weight" in params or "final_layernorm.weight" in params: final_norm_weight = params["final_norm.weight"] if "final_norm.weight" in params else params["final_layernorm.weight"] output_state_dict[f"{prefix_name}{final_norm}.weight"] = final_norm_weight.to(hf_config.torch_dtype) if "final_norm.bias" in params or "final_layernorm.bias" in params: final_norm_bias = params["final_norm.bias"] if "final_norm.bias" in params else params["final_layernorm.bias"] output_state_dict[f"{prefix_name}{final_norm}.bias"] = final_norm_bias.to(hf_config.torch_dtype) # For LM head, transformers' wants the matrix to weight embeddings. print("Converting LM head") lm_head_name = "lm_head.weight" if is_vllm_v2() else "model.lm_head.weight" if megatron_args.untie_embeddings_and_output_weights: if hasattr(megatron_args, "moe_num_experts") and megatron_args.moe_num_experts: params = get_element_from_dict_by_path(final_state_dicts[tp_rank], 'model.language_model.output_layer.weight') else: params = get_element_from_dict_by_path(tp_state_dicts[tp_rank], 'model.language_model.output_layer.weight') if (isinstance(params, dict) and len(params.keys())) or (params is not None and not isinstance(params, dict)): output_state_dict[lm_head_name] = params.to(hf_config.torch_dtype) elif pp_rank == 0 or (pp_rank == pp_size - 1) or (hasattr(megatron_args, "moe_num_experts") and megatron_args.moe_num_experts): output_state_dict[lm_head_name] = word_embeddings # It should be done! print("Conversion from Megatron-LM to Transformers is done!") return output_state_dict def print_rank_0(message): """If distributed is initialized, print only on rank 0.""" if torch.distributed.is_initialized(): if torch.distributed.get_rank() == 0: print(message, flush=True) else: print(message, flush=True) def _load_base_checkpoint(load_dir, rank0=False): """ Load the base state_dict from the given directory If rank0 is true, just loads rank 0 checkpoint, ignoring arguments. """ # Read the tracker file and set the iteration. tracker_filename = get_checkpoint_tracker_filename(load_dir) # If no tracker file, return nothing if not os.path.isfile(tracker_filename): if not rank0: print_rank_0('WARNING: could not find the metadata file {} '.format( tracker_filename)) print_rank_0(' will not load any checkpoints and will start from ' 'random') return None, "", False # Otherwise, read the tracker file and either set the iteration or # mark it as a release checkpoint. iteration, release = read_metadata(tracker_filename) # Checkpoint. if rank0: checkpoint_name = find_checkpoint_rank_0(load_dir, iteration, release) else: checkpoint_name = get_checkpoint_name(load_dir, iteration, release) if release: print_rank_0(f' loading release checkpoint from {load_dir}') else: print_rank_0(f' loading checkpoint from {load_dir} at iteration {iteration}') if isinstance(checkpoint_name, tuple): checkpoint_name = checkpoint_name[0] # Load the checkpoint. try: state_dict = torch.load(checkpoint_name, map_location='cpu') except ModuleNotFoundError: try: from megatron.fp16_deprecated import loss_scaler # pylint: disable=import-outside-toplevel,unused-import # For backward compatibility. if not rank0: print_rank_0(' > deserializing using the old code structure ...') sys.modules['fp16.loss_scaler'] = sys.modules[ 'megatron.fp16_deprecated.loss_scaler'] sys.modules['megatron.fp16.loss_scaler'] = sys.modules[ 'megatron.fp16_deprecated.loss_scaler'] state_dict = torch.load(checkpoint_name, map_location='cpu') sys.modules.pop('fp16.loss_scaler', None) sys.modules.pop('megatron.fp16.loss_scaler', None) except ImportError: from megatron.legacy.fp16_deprecated import loss_scaler # pylint: disable=import-outside-toplevel,unused-import # For backward compatibility. if not rank0: print_rank_0(' > deserializing using the old code structure ...') sys.modules['fp16.loss_scaler'] = sys.modules[ 'megatron.legacy.fp16_deprecated.loss_scaler'] sys.modules['megatron.fp16.loss_scaler'] = sys.modules[ 'megatron.legacy.fp16_deprecated.loss_scaler'] sys.modules['megatron.model'] = sys.modules['megatron.legacy.model'] state_dict = torch.load(checkpoint_name, map_location='cpu') sys.modules.pop('fp16.loss_scaler', None) sys.modules.pop('megatron.fp16.loss_scaler', None) sys.modules.pop('megatron.model', None) except BaseException as e: print_rank_0('could not load the checkpoint') print_rank_0(e) sys.exit() return state_dict, checkpoint_name, release def load_checkpoint(model, optimizer, opt_param_scheduler, load_arg='load', strict=True, model_args=None): """"Transform parallel strategy for checkpoint if needed.""" if model_args is not None: args = model_args else: args = model.model_args if args.get("adaptive_parallel_strategy_on_checkpoint"): load_dir = args[load_arg] target_tp = args.get("tensor_model_parallel_size") target_pp = args.get("pipeline_model_parallel_size") state_dict, _, _ = _load_base_checkpoint(load_dir, rank0=True) checkpoint_args = state_dict['args'] checkpoint_tp = checkpoint_args.tensor_model_parallel_size checkpoint_pp = checkpoint_args.pipeline_model_parallel_size if target_tp != checkpoint_tp or target_pp != checkpoint_pp: script_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "../tools/megatron_checkpoint_utils.py") save_dir = load_dir[:-1] if load_dir.endswith("/") else load_dir save_dir = save_dir + f"-transform-tp{target_tp}-pp{target_pp}" if not os.path.exists(save_dir): if torch.distributed.get_rank() == (torch.distributed.get_world_size() - 1): model_type = "GPT" use_legacy_models = get_use_legacy_models(args) if use_legacy_models: cmd = f"python {script_path} --model-type {model_type} --load-dir {args.get('load')} " + \ f"--save-dir {save_dir} --target-tensor-parallel-size {target_tp} " + \ f"--target-pipeline-parallel-size {target_pp}" else: cmd = f"python {script_path} --model-type {model_type} --loader mcore --load-dir {args.get('load')} " + \ f"--saver mcore --save-dir {save_dir} --target-tensor-parallel-size {target_tp} " + \ f"--target-pipeline-parallel-size {target_pp}" subprocess.run(cmd, shell=True, check=True) torch.distributed.barrier() args[load_arg] = save_dir print_rank_0(f"Using transformed checkpoint {save_dir}") return vllm_load_checkpoint(model, optimizer, opt_param_scheduler, load_arg=load_arg, strict=strict, model_args=model_args) def vllm_load_checkpoint(model, optimizer, opt_param_scheduler, load_arg='load', strict=True, model_args=None): """Load a model checkpoint and return the iteration. strict (bool): whether to strictly enforce that the keys in :attr:`state_dict` of the checkpoint match the names of parameters and buffers in model. """ if model_args is not None: args = model_args else: args = model.model_args load_dir = args[load_arg] model = [unwrap_model(model)] state_dict, checkpoint_name, release = _load_base_checkpoint(load_dir, rank0=False) # Checkpoint not loaded. if state_dict is None: # Conditionally exit at this point. if args.get("exit_on_missing_checkpoint", False): print_rank_0(">> '--exit-on-missing-checkpoint' set ... exiting. <<") torch.distributed.barrier() sys.exit() # Iteration defaults to 0. return 0 # Set checkpoint version. set_checkpoint_version(state_dict.get('checkpoint_version', 0)) # Set iteration. if args.get("finetune", True) or release: iteration = 0 else: try: iteration = state_dict['iteration'] except KeyError: try: # Backward compatible with older checkpoints iteration = state_dict['total_iters'] except KeyError: print_rank_0('A metadata file exists but unable to load ' 'iteration from checkpoint {}, exiting'.format( checkpoint_name)) sys.exit() # Check arguments. if 'args' in state_dict and not args.get("finetune", True): checkpoint_args = state_dict['args'] args["consumed_train_samples"] = getattr(checkpoint_args, 'consumed_train_samples', 0) update_num_microbatches(consumed_samples=args["consumed_train_samples"]) args["consumed_valid_samples"] = getattr(checkpoint_args, 'consumed_valid_samples', 0) else: print_rank_0('could not find arguments in the checkpoint ...') # Model. if len(model) == 1: model[0].load_state_dict(state_dict['model'], strict=strict) else: for i in range(len(model)): # pylint: disable=consider-using-enumerate mpu.set_virtual_pipeline_model_parallel_rank(i) model[i].load_state_dict(state_dict['model%d' % i], strict=strict) # Fix up query/key/value matrix ordering if needed. checkpoint_version = get_checkpoint_version() print_rank_0(f' checkpoint version {checkpoint_version}') fix_query_key_value_ordering(model, checkpoint_version) # Optimizer. if not release and not args.get("finetune", True) and not args["no_load_optim"]: try: # Load state dict. if optimizer is not None: optimizer.load_state_dict(state_dict['optimizer']) # Load distributed optimizer's custom parameter state. if args["use_distributed_optimizer"]: tracker_filename = get_checkpoint_tracker_filename(load_dir) iteration, release = read_metadata(tracker_filename) model_checkpoint_name = \ get_checkpoint_name(load_dir, iteration, release) optim_checkpoint_name = \ get_distributed_optimizer_checkpoint_name( model_checkpoint_name) optimizer.load_parameter_state(optim_checkpoint_name) # Load scheduler. if opt_param_scheduler is not None: if 'lr_scheduler' in state_dict: # backward compatbility opt_param_scheduler.load_state_dict(state_dict['lr_scheduler']) else: opt_param_scheduler.load_state_dict(state_dict['opt_param_scheduler']) except KeyError: print_rank_0('Unable to load optimizer from checkpoint {}. ' 'Specify --no-load-optim or --finetune to prevent ' 'attempting to load the optimizer state, ' 'exiting ...'.format(checkpoint_name)) sys.exit() else: if (args.get("fp16") or args.get("bf16")) and optimizer is not None: optimizer.reload_model_params() # rng states. if not release and ("finetune" in args and not args["finetune"]) and ("no_load_rng" in args and not args["no_load_rng"]): try: if 'rng_state' in state_dict: # access rng_state for data parallel rank if args.get("data_parallel_random_init", False): rng_state = state_dict['rng_state'][mpu.get_data_parallel_rank()] else: rng_state = state_dict['rng_state'][0] random.setstate(rng_state['random_rng_state']) np.random.set_state(rng_state['np_rng_state']) torch.set_rng_state(rng_state['torch_rng_state']) torch.cuda.set_rng_state(rng_state['cuda_rng_state']) # Check for empty states array if not rng_state['rng_tracker_states']: raise KeyError tensor_parallel.get_cuda_rng_tracker().set_states( rng_state['rng_tracker_states']) else: # backward compatability random.setstate(state_dict['random_rng_state']) np.random.set_state(state_dict['np_rng_state']) torch.set_rng_state(state_dict['torch_rng_state']) torch.cuda.set_rng_state(state_dict['cuda_rng_state']) # Check for empty states array if not state_dict['rng_tracker_states']: raise KeyError tensor_parallel.get_cuda_rng_tracker().set_states( state_dict['rng_tracker_states']) except KeyError: print_rank_0('Unable to load rng state from checkpoint {}. ' 'Specify --no-load-rng or --finetune to prevent ' 'attempting to load the rng state, ' 'exiting ...'.format(checkpoint_name)) sys.exit() # Some utilities want to load a checkpoint without distributed being initialized if torch.distributed.is_initialized(): torch.distributed.barrier() print_rank_0(f' successfully loaded checkpoint from {args["load"]}') return iteration def get_checkpoint_name(checkpoints_path, iteration, release=False, pipeline_parallel=None, tensor_rank=None, pipeline_rank=None, expert_parallel=None, expert_rank=None): """Determine the directory name for this rank's checkpoint.""" if release: directory = 'release' else: directory = 'iter_{:07d}'.format(iteration) # Use both the tensor and pipeline MP rank. if pipeline_parallel is None: pipeline_parallel = (get_pipeline_model_parallel_world_size() > 1) if tensor_rank is None: tensor_rank = mpu.get_tensor_model_parallel_rank() if pipeline_rank is None: pipeline_rank = get_pipeline_model_parallel_rank() if expert_parallel is None: expert_parallel = False #(mpu.get_expert_model_parallel_world_size() > 1) if expert_rank is None: expert_rank = 0 #mpu.get_expert_model_parallel_rank() # Use both the tensor and pipeline MP rank. If using the distributed # optimizer, then the optimizer's path must additionally include the # data parallel rank. if not pipeline_parallel: common_path = os.path.join(checkpoints_path, directory, f'mp_rank_{tensor_rank:02d}') else: common_path = os.path.join(checkpoints_path, directory, f'mp_rank_{tensor_rank:02d}_{pipeline_rank:03d}') if expert_parallel: common_path = common_path + f'_{expert_rank:03d}' model_path = os.path.join(common_path, "model_optim_rng.pt") if not os.path.exists(model_path): model_path = os.path.join(common_path, "model_rng.pt") return model_path