def main()

in scripts/convert_sana_to_diffusers.py [0:0]


def main(args):
    cache_dir_path = os.path.expanduser("~/.cache/huggingface/hub")

    if args.orig_ckpt_path is None or args.orig_ckpt_path in ckpt_ids:
        ckpt_id = args.orig_ckpt_path or ckpt_ids[0]
        snapshot_download(
            repo_id=f"{'/'.join(ckpt_id.split('/')[:2])}",
            cache_dir=cache_dir_path,
            repo_type="model",
        )
        file_path = hf_hub_download(
            repo_id=f"{'/'.join(ckpt_id.split('/')[:2])}",
            filename=f"{'/'.join(ckpt_id.split('/')[2:])}",
            cache_dir=cache_dir_path,
            repo_type="model",
        )
    else:
        file_path = args.orig_ckpt_path

    print(colored(f"Loading checkpoint from {file_path}", "green", attrs=["bold"]))
    all_state_dict = torch.load(file_path, weights_only=True)
    state_dict = all_state_dict.pop("state_dict")
    converted_state_dict = {}

    # Patch embeddings.
    converted_state_dict["patch_embed.proj.weight"] = state_dict.pop("x_embedder.proj.weight")
    converted_state_dict["patch_embed.proj.bias"] = state_dict.pop("x_embedder.proj.bias")

    # Caption projection.
    converted_state_dict["caption_projection.linear_1.weight"] = state_dict.pop("y_embedder.y_proj.fc1.weight")
    converted_state_dict["caption_projection.linear_1.bias"] = state_dict.pop("y_embedder.y_proj.fc1.bias")
    converted_state_dict["caption_projection.linear_2.weight"] = state_dict.pop("y_embedder.y_proj.fc2.weight")
    converted_state_dict["caption_projection.linear_2.bias"] = state_dict.pop("y_embedder.y_proj.fc2.bias")

    # Handle different time embedding structure based on model type

    if args.model_type in ["SanaSprint_1600M_P1_D20", "SanaSprint_600M_P1_D28"]:
        # For Sana Sprint, the time embedding structure is different
        converted_state_dict["time_embed.timestep_embedder.linear_1.weight"] = state_dict.pop(
            "t_embedder.mlp.0.weight"
        )
        converted_state_dict["time_embed.timestep_embedder.linear_1.bias"] = state_dict.pop("t_embedder.mlp.0.bias")
        converted_state_dict["time_embed.timestep_embedder.linear_2.weight"] = state_dict.pop(
            "t_embedder.mlp.2.weight"
        )
        converted_state_dict["time_embed.timestep_embedder.linear_2.bias"] = state_dict.pop("t_embedder.mlp.2.bias")

        # Guidance embedder for Sana Sprint
        converted_state_dict["time_embed.guidance_embedder.linear_1.weight"] = state_dict.pop(
            "cfg_embedder.mlp.0.weight"
        )
        converted_state_dict["time_embed.guidance_embedder.linear_1.bias"] = state_dict.pop("cfg_embedder.mlp.0.bias")
        converted_state_dict["time_embed.guidance_embedder.linear_2.weight"] = state_dict.pop(
            "cfg_embedder.mlp.2.weight"
        )
        converted_state_dict["time_embed.guidance_embedder.linear_2.bias"] = state_dict.pop("cfg_embedder.mlp.2.bias")
    else:
        # Original Sana time embedding structure
        converted_state_dict["time_embed.emb.timestep_embedder.linear_1.weight"] = state_dict.pop(
            "t_embedder.mlp.0.weight"
        )
        converted_state_dict["time_embed.emb.timestep_embedder.linear_1.bias"] = state_dict.pop(
            "t_embedder.mlp.0.bias"
        )
        converted_state_dict["time_embed.emb.timestep_embedder.linear_2.weight"] = state_dict.pop(
            "t_embedder.mlp.2.weight"
        )
        converted_state_dict["time_embed.emb.timestep_embedder.linear_2.bias"] = state_dict.pop(
            "t_embedder.mlp.2.bias"
        )

    # Shared norm.
    converted_state_dict["time_embed.linear.weight"] = state_dict.pop("t_block.1.weight")
    converted_state_dict["time_embed.linear.bias"] = state_dict.pop("t_block.1.bias")

    # y norm
    converted_state_dict["caption_norm.weight"] = state_dict.pop("attention_y_norm.weight")

    # scheduler
    if args.image_size == 4096:
        flow_shift = 6.0
    else:
        flow_shift = 3.0

    # model config
    if args.model_type in ["SanaMS_1600M_P1_D20", "SanaSprint_1600M_P1_D20", "SanaMS1.5_1600M_P1_D20"]:
        layer_num = 20
    elif args.model_type in ["SanaMS_600M_P1_D28", "SanaSprint_600M_P1_D28"]:
        layer_num = 28
    elif args.model_type == "SanaMS_4800M_P1_D60":
        layer_num = 60
    else:
        raise ValueError(f"{args.model_type} is not supported.")
    # Positional embedding interpolation scale.
    interpolation_scale = {512: None, 1024: None, 2048: 1.0, 4096: 2.0}
    qk_norm = (
        "rms_norm_across_heads"
        if args.model_type
        in ["SanaMS1.5_1600M_P1_D20", "SanaMS1.5_4800M_P1_D60", "SanaSprint_600M_P1_D28", "SanaSprint_1600M_P1_D20"]
        else None
    )

    for depth in range(layer_num):
        # Transformer blocks.
        converted_state_dict[f"transformer_blocks.{depth}.scale_shift_table"] = state_dict.pop(
            f"blocks.{depth}.scale_shift_table"
        )

        # Linear Attention is all you need 🤘
        # Self attention.
        q, k, v = torch.chunk(state_dict.pop(f"blocks.{depth}.attn.qkv.weight"), 3, dim=0)
        converted_state_dict[f"transformer_blocks.{depth}.attn1.to_q.weight"] = q
        converted_state_dict[f"transformer_blocks.{depth}.attn1.to_k.weight"] = k
        converted_state_dict[f"transformer_blocks.{depth}.attn1.to_v.weight"] = v
        if qk_norm is not None:
            # Add Q/K normalization for self-attention (attn1) - needed for Sana-Sprint and Sana-1.5
            converted_state_dict[f"transformer_blocks.{depth}.attn1.norm_q.weight"] = state_dict.pop(
                f"blocks.{depth}.attn.q_norm.weight"
            )
            converted_state_dict[f"transformer_blocks.{depth}.attn1.norm_k.weight"] = state_dict.pop(
                f"blocks.{depth}.attn.k_norm.weight"
            )
        # Projection.
        converted_state_dict[f"transformer_blocks.{depth}.attn1.to_out.0.weight"] = state_dict.pop(
            f"blocks.{depth}.attn.proj.weight"
        )
        converted_state_dict[f"transformer_blocks.{depth}.attn1.to_out.0.bias"] = state_dict.pop(
            f"blocks.{depth}.attn.proj.bias"
        )

        # Feed-forward.
        converted_state_dict[f"transformer_blocks.{depth}.ff.conv_inverted.weight"] = state_dict.pop(
            f"blocks.{depth}.mlp.inverted_conv.conv.weight"
        )
        converted_state_dict[f"transformer_blocks.{depth}.ff.conv_inverted.bias"] = state_dict.pop(
            f"blocks.{depth}.mlp.inverted_conv.conv.bias"
        )
        converted_state_dict[f"transformer_blocks.{depth}.ff.conv_depth.weight"] = state_dict.pop(
            f"blocks.{depth}.mlp.depth_conv.conv.weight"
        )
        converted_state_dict[f"transformer_blocks.{depth}.ff.conv_depth.bias"] = state_dict.pop(
            f"blocks.{depth}.mlp.depth_conv.conv.bias"
        )
        converted_state_dict[f"transformer_blocks.{depth}.ff.conv_point.weight"] = state_dict.pop(
            f"blocks.{depth}.mlp.point_conv.conv.weight"
        )

        # Cross-attention.
        q = state_dict.pop(f"blocks.{depth}.cross_attn.q_linear.weight")
        q_bias = state_dict.pop(f"blocks.{depth}.cross_attn.q_linear.bias")
        k, v = torch.chunk(state_dict.pop(f"blocks.{depth}.cross_attn.kv_linear.weight"), 2, dim=0)
        k_bias, v_bias = torch.chunk(state_dict.pop(f"blocks.{depth}.cross_attn.kv_linear.bias"), 2, dim=0)

        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_q.weight"] = q
        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_q.bias"] = q_bias
        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_k.weight"] = k
        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_k.bias"] = k_bias
        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_v.weight"] = v
        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_v.bias"] = v_bias
        if qk_norm is not None:
            # Add Q/K normalization for cross-attention (attn2) - needed for Sana-Sprint and Sana-1.5
            converted_state_dict[f"transformer_blocks.{depth}.attn2.norm_q.weight"] = state_dict.pop(
                f"blocks.{depth}.cross_attn.q_norm.weight"
            )
            converted_state_dict[f"transformer_blocks.{depth}.attn2.norm_k.weight"] = state_dict.pop(
                f"blocks.{depth}.cross_attn.k_norm.weight"
            )

        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_out.0.weight"] = state_dict.pop(
            f"blocks.{depth}.cross_attn.proj.weight"
        )
        converted_state_dict[f"transformer_blocks.{depth}.attn2.to_out.0.bias"] = state_dict.pop(
            f"blocks.{depth}.cross_attn.proj.bias"
        )

    # Final block.
    converted_state_dict["proj_out.weight"] = state_dict.pop("final_layer.linear.weight")
    converted_state_dict["proj_out.bias"] = state_dict.pop("final_layer.linear.bias")
    converted_state_dict["scale_shift_table"] = state_dict.pop("final_layer.scale_shift_table")

    # Transformer
    with CTX():
        transformer_kwargs = {
            "in_channels": 32,
            "out_channels": 32,
            "num_attention_heads": model_kwargs[args.model_type]["num_attention_heads"],
            "attention_head_dim": model_kwargs[args.model_type]["attention_head_dim"],
            "num_layers": model_kwargs[args.model_type]["num_layers"],
            "num_cross_attention_heads": model_kwargs[args.model_type]["num_cross_attention_heads"],
            "cross_attention_head_dim": model_kwargs[args.model_type]["cross_attention_head_dim"],
            "cross_attention_dim": model_kwargs[args.model_type]["cross_attention_dim"],
            "caption_channels": 2304,
            "mlp_ratio": 2.5,
            "attention_bias": False,
            "sample_size": args.image_size // 32,
            "patch_size": 1,
            "norm_elementwise_affine": False,
            "norm_eps": 1e-6,
            "interpolation_scale": interpolation_scale[args.image_size],
        }

        # Add qk_norm parameter for Sana Sprint
        if args.model_type in [
            "SanaMS1.5_1600M_P1_D20",
            "SanaMS1.5_4800M_P1_D60",
            "SanaSprint_600M_P1_D28",
            "SanaSprint_1600M_P1_D20",
        ]:
            transformer_kwargs["qk_norm"] = "rms_norm_across_heads"
        if args.model_type in ["SanaSprint_1600M_P1_D20", "SanaSprint_600M_P1_D28"]:
            transformer_kwargs["guidance_embeds"] = True

        transformer = SanaTransformer2DModel(**transformer_kwargs)

    if is_accelerate_available():
        load_model_dict_into_meta(transformer, converted_state_dict)
    else:
        transformer.load_state_dict(converted_state_dict, strict=True, assign=True)

    try:
        state_dict.pop("y_embedder.y_embedding")
        state_dict.pop("pos_embed")
        state_dict.pop("logvar_linear.weight")
        state_dict.pop("logvar_linear.bias")
    except KeyError:
        print("y_embedder.y_embedding or pos_embed not found in the state_dict")

    assert len(state_dict) == 0, f"State dict is not empty, {state_dict.keys()}"

    num_model_params = sum(p.numel() for p in transformer.parameters())
    print(f"Total number of transformer parameters: {num_model_params}")

    transformer = transformer.to(weight_dtype)

    if not args.save_full_pipeline:
        print(
            colored(
                f"Only saving transformer model of {args.model_type}. "
                f"Set --save_full_pipeline to save the whole Pipeline",
                "green",
                attrs=["bold"],
            )
        )
        transformer.save_pretrained(
            os.path.join(args.dump_path, "transformer"), safe_serialization=True, max_shard_size="5GB"
        )
    else:
        print(colored(f"Saving the whole Pipeline containing {args.model_type}", "green", attrs=["bold"]))
        # VAE
        ae = AutoencoderDC.from_pretrained("mit-han-lab/dc-ae-f32c32-sana-1.1-diffusers", torch_dtype=torch.float32)

        # Text Encoder
        text_encoder_model_path = "Efficient-Large-Model/gemma-2-2b-it"
        tokenizer = AutoTokenizer.from_pretrained(text_encoder_model_path)
        tokenizer.padding_side = "right"
        text_encoder = AutoModelForCausalLM.from_pretrained(
            text_encoder_model_path, torch_dtype=torch.bfloat16
        ).get_decoder()

        # Choose the appropriate pipeline and scheduler based on model type
        if args.model_type in ["SanaSprint_1600M_P1_D20", "SanaSprint_600M_P1_D28"]:
            # Force SCM Scheduler for Sana Sprint regardless of scheduler_type
            if args.scheduler_type != "scm":
                print(
                    colored(
                        f"Warning: Overriding scheduler_type '{args.scheduler_type}' to 'scm' for SanaSprint model",
                        "yellow",
                        attrs=["bold"],
                    )
                )

            # SCM Scheduler for Sana Sprint
            scheduler_config = {
                "prediction_type": "trigflow",
                "sigma_data": 0.5,
            }
            scheduler = SCMScheduler(**scheduler_config)
            pipe = SanaSprintPipeline(
                tokenizer=tokenizer,
                text_encoder=text_encoder,
                transformer=transformer,
                vae=ae,
                scheduler=scheduler,
            )
        else:
            # Original Sana scheduler
            if args.scheduler_type == "flow-dpm_solver":
                scheduler = DPMSolverMultistepScheduler(
                    flow_shift=flow_shift,
                    use_flow_sigmas=True,
                    prediction_type="flow_prediction",
                )
            elif args.scheduler_type == "flow-euler":
                scheduler = FlowMatchEulerDiscreteScheduler(shift=flow_shift)
            else:
                raise ValueError(f"Scheduler type {args.scheduler_type} is not supported")

            pipe = SanaPipeline(
                tokenizer=tokenizer,
                text_encoder=text_encoder,
                transformer=transformer,
                vae=ae,
                scheduler=scheduler,
            )

        pipe.save_pretrained(args.dump_path, safe_serialization=True, max_shard_size="5GB")