in megatron_patch/model/mixtral/transformer_config.py [0:0]
def __post_init__(self):
"""Python dataclass method that is used to modify attributes after initialization.
See https://docs.python.org/3/library/dataclasses.html#post-init-processing for more
details.
"""
super().__post_init__()
if self.fp16 and self.bf16:
raise ValueError(
f'Only one of self.fp16: {self.fp16} and self.bf16 {self.bf16} should be True.'
)
if self.num_attention_heads % self.tensor_model_parallel_size != 0:
raise ValueError(
f"num_attention_heads ({self.num_attention_heads}) must be a multiple of "
f"tensor_model_parallel_size ({self.tensor_model_parallel_size})."
)
if self.ffn_hidden_size is None:
self.ffn_hidden_size = 4 * self.hidden_size
if self.kv_channels is None:
self.kv_channels = self.hidden_size // self.num_attention_heads
if self.num_query_groups is None:
self.num_query_groups = self.num_attention_heads
if self.num_query_groups % self.tensor_model_parallel_size != 0:
raise ValueError(
f"num_query_groups ({self.num_query_groups}) must be a multiple of "
f"tensor_model_parallel_size ({self.tensor_model_parallel_size})."
)
if self.apply_query_key_layer_scaling:
self.attention_softmax_in_fp32 = True
if self.expert_model_parallel_size > 1 and self.num_moe_experts is None:
raise ValueError('num_moe_experts must be non None to use expert-parallel.')
if self.num_moe_experts is not None and self.num_moe_experts <= 0:
raise ValueError('num_moe_experts must be non-negative.')
if self.moe_shared_expert_intermediate_size is not None:
if self.moe_shared_expert_intermediate_size <= 0:
raise ValueError(
f'moe_shared_expert_intermediate_size must be '
f'num_shared_experts * ffn_size_of_each_shared_expert, '
f'but got {self.moe_shared_expert_intermediate_size}'
)
if self.moe_shared_expert_overlap and self.moe_token_dispatcher_type not in [
"alltoall"
]:
raise ValueError(
f'moe_shared_expert_overlap only works with alltoall token dispatcher.'
)
if self.moe_expert_capacity_factor is not None:
if self.moe_token_dispatcher_type not in ["alltoall", "alltoall_seq"]:
raise ValueError(
'moe_expert_capacity_factor only works with alltoall token dispatcher'
)
if self.moe_expert_capacity_factor < 0:
self.moe_expert_capacity_factor = None
if self.moe_router_load_balancing_type not in ["aux_loss", "none"]:
raise ValueError(
'moe_expert_capacity_factor only works with aux_loss or none load balancing'
)
if self.moe_pad_expert_input_to_capacity:
if self.moe_expert_capacity_factor is None:
raise ValueError(
'moe_expert_capacity_factor must be set to use moe_pad_expert_input_to_capacity'
)
if self.cpu_offloading and (
self.cpu_offloading_num_layers < 0 or self.cpu_offloading_num_layers >= self.num_layers
):
raise ValueError(
f'CPU offloading can be done only for layers less than {self.num_layers}'
)
if self.cpu_offloading and self.pipeline_model_parallel_size > 1:
raise ValueError(
'Currently there is no support for Pipeline parallelism with CPU offloading'
)
if self.cpu_offloading and self.recompute_granularity is not None:
raise ValueError(
'CPU offloading does not work when activation recomputation is enabled'
)
if self.recompute_granularity is not None:
if self.recompute_granularity not in ['full', 'selective']:
raise ValueError(
f'When using recompute_granuarlity: {self.recompute_granularity} must be "full"'
'or "selective".'
)
if self.recompute_method is not None:
if self.recompute_method not in ['block', 'uniform']:
raise ValueError(
f'recompute_method: {self.recompute_method} must be "block" or "uniform".'
)
elif self.recompute_granularity != 'selective':
raise ValueError(
f'Using recompute_granularity: {self.recompute_granularity} so '
'recompute_method must be "block" or "uniform"'
)
if self.recompute_granularity != 'selective' and self.recompute_num_layers is None:
raise ValueError(
f'When using recompute_granularity: {self.recompute_granularity} '
'recompute_num_layers must be between '
'1 and num_layers_per_pipeline_rank: '
f'{self.num_layers // self.pipeline_model_parallel_size}'
)
elif (
self.recompute_granularity == 'selective' and self.recompute_num_layers is not None
):
raise ValueError(
f'When using recompute_granularity: {self.recompute_granularity} '
'recompute_num_layers must be None.'
)
if self.distribute_saved_activations and self.sequence_parallel:
raise ValueError(
f'distribute_saved_activations: {self.distribute_saved_activations} must be '
f'false when sequence parallel is enabled: {self.sequence_parallel}'
)
if self.virtual_pipeline_model_parallel_size is not None:
if not self.num_layers % self.virtual_pipeline_model_parallel_size == 0:
raise ValueError(
f'num_layers: {self.num_layers} must be divisible by '
f'virtual_model_parallel_size {self.virtual_pipeline_model_parallel_size}'
)
if self.apply_query_key_layer_scaling:
self.attention_softmax_in_fp32 = True
if self.bias_activation_fusion:
if self.activation_func not in [F.gelu, F.silu]:
raise ValueError(
"When bias_activation_fusion is True, activation function should be either "
"gelu or swiglu"
)
if (
self.activation_func == F.gelu
and not self.gated_linear_unit
and not self.add_bias_linear
):
raise ValueError(
"When bias_activation_fusion is True, gated_linear_unit is False, "
"and activation function is gelu, add_bias_linear must also be True."
)
if self.activation_func_fp8_input_store:
if self.activation_func != F.silu or not self.gated_linear_unit:
raise ValueError("Storing activation input in FP8 is supported only for SwiGLU.")
if self.apply_rope_fusion:
if self.rotary_interleaved:
raise ValueError("rotary_interleaved does not work with apply_rope_fusion.")
from megatron.core.models.common.embeddings.rope_utils import HAVE_APPLY_ROPE_FUSION
if not HAVE_APPLY_ROPE_FUSION:
raise ValueError(
"apply_rope_fusion is not available. Please install TE >= 1.4 or Apex."
)
if self.multi_latent_attention and self.rotary_interleaved:
raise ValueError("rotary_interleaved does not work with multi_latent_attention.")
if self.init_method is None:
self.init_method = init_method_normal(self.init_method_std)
if self.output_layer_init_method is None:
self.output_layer_init_method = scaled_init_method_normal(
self.init_method_std, self.num_layers
)
if self.moe_extended_tp:
if self.moe_token_dispatcher_type != 'allgather':
raise ValueError(
"Moe extended TP parallelism only applies to allgather based token dispatcher."
)
extended_tp_size = self.tensor_model_parallel_size * self.expert_model_parallel_size
if self.ffn_hidden_size % extended_tp_size != 0:
raise ValueError(
f'ffn_hidden_size: {self.ffn_hidden_size} must be divisible by '
f'extended_tp_size {extended_tp_size}'
)
if self.num_moe_experts and self.fp8:
# TE version below 1.7.0 will raise Error when handle zeros tokens for expert
if not is_te_min_version("1.7.0.dev0"):
raise ValueError(
"Only transformer-engine>=1.7.0 supports MoE FP8 training, "
f"but your version is {get_te_version()}."
)
if self.moe_grouped_gemm and not is_te_min_version("1.11.0"):
raise ValueError(
"Only transformer-engine>=1.11.0 supports FP8 grouped gemm, "
f"but your version is {get_te_version()}."
)
if self.flash_decode and self.fp8:
raise ValueError("FP8 inference is currently not support with flash decoding.")
if self.moe_token_dispatcher_type in ['allgather', 'alltoall_seq']:
if self.variable_seq_lengths is True:
raise ValueError(
f"Token dispatcher type: {self.moe_token_dispatcher_type} does not support "
f"variable sequence length, please use alltoall dispatcher instead."
)
if self.cp_comm_type is not None:
if isinstance(self.cp_comm_type, list):
assert len(self.cp_comm_type) == self.num_layers, (
f"Length of cp_comm_type ({len(self.cp_comm_type)}) should equal to "
f"the total number of transformer layers ({self.num_layers})!"
)
else:
assert isinstance(
self.cp_comm_type, str
), "Unsupported communication type for context parallelism!"