scripts/ft_gemma3n_image_trl.py (231 lines of code) (raw):

""" Train Gemma-3n on various vision-language datasets including intersection-dataset. For Gemma-3n with intersection dataset: accelerate launch \ --config_file examples/accelerate_configs/deepspeed_zero3.yaml \ sft_vlm_gemma3n.py \ --dataset_name ariG23498/intersection-dataset \ --model_name_or_path google/gemma-3n-E2B-it \ --per_device_train_batch_size 1 \ --gradient_accumulation_steps 1 \ --output_dir gemma-3n-E2B-it-trl-sft-intersection \ --bf16 \ --torch_dtype bfloat16 \ --use_peft \ --lora_target_modules all-linear \ --attn_implementation eager Train Gemma-3n on the HuggingFaceH4/llava-instruct-mix-vsft dataset (single-image). accelerate launch \ --config_file examples/accelerate_configs/deepspeed_zero3.yaml \ sft_vlm_gemma3n.py \ --dataset_name HuggingFaceH4/llava-instruct-mix-vsft \ --model_name_or_path google/gemma-3-4b-it \ --per_device_train_batch_size 1 \ --gradient_accumulation_steps 1 \ --output_dir gemma-3-4b-it-trl-sft-llava-instruct-mix-vsft \ --bf16 \ --torch_dtype bfloat16 \ --use_peft \ --lora_target_modules all-linear \ --attn_implementation eager Train Gemma-3n on the FanqingM/MMIU-Benchmark dataset (multi-image). accelerate launch \ --config_file examples/accelerate_configs/deepspeed_zero3.yaml \ sft_vlm_gemma3n.py \ --dataset_name FanqingM/MMIU-Benchmark \ --dataset_train_split test \ --model_name_or_path google/gemma-3-4b-it \ --per_device_train_batch_size 1 \ --gradient_accumulation_steps 1 \ --output_dir gemma-3-4b-it-trl-sft-MMIU-Benchmark \ --bf16 \ --torch_dtype bfloat16 \ --use_peft \ --lora_target_modules all-linear --attn_implementation eager """ import io import os import zipfile import torch from datasets import DatasetDict, load_dataset from huggingface_hub import hf_hub_download, list_repo_files from PIL import Image from transformers import (AutoModelForImageTextToText, AutoProcessor, Gemma3nForConditionalGeneration) from trl import (ModelConfig, ScriptArguments, SFTConfig, SFTTrainer, TrlParser, get_kbit_device_map, get_quantization_config) def my_get_peft_config(model_args: ModelConfig): """A version of get_peft_config that handles comma-separated target modules""" if model_args.use_peft is False: return None # Import here to avoid issues if PEFT is not available try: from peft import LoraConfig except ImportError: raise ValueError( "You need to have PEFT library installed in your environment, make sure to install `peft`. " "Make sure to run `pip install -U peft`." ) # Fix the target_modules to be a list if it's a comma-separated string target_modules = model_args.lora_target_modules if isinstance(target_modules, str) and target_modules != "all-linear": # Convert comma-separated string to list target_modules = [module.strip() for module in target_modules.split(",")] peft_config = LoraConfig( task_type=model_args.lora_task_type, r=model_args.lora_r, target_modules=target_modules, lora_alpha=model_args.lora_alpha, lora_dropout=model_args.lora_dropout, bias="none", use_rslora=model_args.use_rslora, use_dora=model_args.use_dora, modules_to_save=model_args.lora_modules_to_save, ) return peft_config # For intersection dataset processing def format_intersection_data(samples: dict) -> dict[str, list]: """Format intersection dataset to match expected message format""" formatted_samples = {"messages": []} for idx in range(len(samples["image"])): image = samples["image"][idx].convert("RGB") label = str(samples["label"][idx]) message = [ { "role": "system", "content": [ { "type": "text", "text": "You are an assistant with great geometry skills.", } ], }, { "role": "user", "content": [ {"type": "image", "image": image}, { "type": "text", "text": "How many intersection points are there in the image?", }, ], }, {"role": "assistant", "content": [{"type": "text", "text": label}]}, ] formatted_samples["messages"].append(message) return formatted_samples # For multi-image example def process_vision_info(messages: list[dict]) -> list[Image.Image]: image_inputs = [] for msg in messages: content = msg.get("content", []) if not isinstance(content, list): content = [content] for element in content: if isinstance(element, dict) and ( "image" in element or element.get("type") == "image" ): if "image" in element: image = element["image"] else: image = element if image is not None: # Handle dictionary with bytes if isinstance(image, dict) and "bytes" in image: pil_image = Image.open(io.BytesIO(image["bytes"])) image_inputs.append(pil_image.convert("RGB")) # Handle PIL Image objects elif hasattr(image, "convert"): image_inputs.append(image.convert("RGB")) return image_inputs def format_data(samples: dict) -> dict[str, list]: formatted_samples = {"messages": []} for cont in range(len(samples["question"])): images = [] for img_path in samples["input_image_path"][cont]: try: with open(img_path, "rb") as f: img_bytes = f.read() image = Image.open(io.BytesIO(img_bytes)).convert("RGB") images.append({"type": "image", "image": image}) except Exception as e: print(f"Error processing image {img_path}: {e}") continue formatted_samples["messages"].append( [ { "role": "system", "content": [{"type": "text", "text": samples["context"][cont]}], }, { "role": "user", "content": images + [{"type": "text", "text": samples["question"][cont]}], }, { "role": "assistant", "content": [{"type": "text", "text": samples["output"][cont]}], }, ] ) return formatted_samples # For multi-image example def prepare_dataset( dataset: DatasetDict, dataset_name: str, dataset_train_split: str ) -> DatasetDict: all_files = list_repo_files(dataset_name, repo_type="dataset") zip_files = [f for f in all_files if f.endswith(".zip")] for zip_filename in zip_files: zip_path = hf_hub_download( repo_id=dataset_name, filename=zip_filename, repo_type="dataset" ) extract_folder = zip_filename.replace(".zip", "") os.makedirs(extract_folder, exist_ok=True) with zipfile.ZipFile(zip_path, "r") as zip_ref: zip_ref.extractall(extract_folder) dataset = dataset.map(format_data, batched=True, batch_size=4, num_proc=16) return dataset def main(): parser = TrlParser((ScriptArguments, SFTConfig, ModelConfig)) script_args, training_args, model_args = parser.parse_args_and_config() training_args.gradient_checkpointing_kwargs = dict(use_reentrant=False) training_args.remove_unused_columns = False training_args.dataset_kwargs = {"skip_prepare_dataset": True} ################ # Model, Tokenizer & Processor ################ torch_dtype = ( model_args.torch_dtype if model_args.torch_dtype in ["auto", None] else getattr(torch, model_args.torch_dtype) ) quantization_config = get_quantization_config(model_args) model_kwargs = dict( revision=model_args.model_revision, attn_implementation=model_args.attn_implementation, torch_dtype=torch_dtype, device_map=get_kbit_device_map() if quantization_config is not None else None, quantization_config=quantization_config, ) processor = AutoProcessor.from_pretrained( model_args.model_name_or_path, trust_remote_code=model_args.trust_remote_code ) processor.tokenizer.padding_side = "right" # Use appropriate model class based on model name if "gemma-3n" in model_args.model_name_or_path.lower(): model = Gemma3nForConditionalGeneration.from_pretrained( model_args.model_name_or_path, trust_remote_code=model_args.trust_remote_code, **model_kwargs, ) else: model = AutoModelForImageTextToText.from_pretrained( model_args.model_name_or_path, trust_remote_code=model_args.trust_remote_code, **model_kwargs, ) def collate_fn(examples): texts = [] images_list = [] for example in examples: # Apply chat template to get text text = processor.apply_chat_template( example["messages"], tokenize=False, add_generation_prompt=False ).strip() texts.append(text) # Extract images if "images" in example: # single-image case images = [img.convert("RGB") for img in example["images"]] else: # multi-image case or intersection dataset images = process_vision_info(example["messages"]) images_list.append(images) # Tokenize the texts and process the images batch = processor( text=texts, images=images_list, return_tensors="pt", padding=True ) # The labels are the input_ids, and we mask the padding tokens in the loss computation labels = batch["input_ids"].clone() # Mask tokens for Gemma3n model if "gemma-3n" in model_args.model_name_or_path.lower(): # Use Gemma3n specific token masking labels[labels == processor.tokenizer.pad_token_id] = -100 if hasattr(processor.tokenizer, "image_token_id"): labels[labels == processor.tokenizer.image_token_id] = -100 if hasattr(processor.tokenizer, "boi_token_id"): labels[labels == processor.tokenizer.boi_token_id] = -100 if hasattr(processor.tokenizer, "eoi_token_id"): labels[labels == processor.tokenizer.eoi_token_id] = -100 else: # Original masking for other models image_token_id = [ processor.tokenizer.convert_tokens_to_ids( processor.tokenizer.special_tokens_map["boi_token"] ) ] labels[labels == processor.tokenizer.pad_token_id] = -100 labels[labels == image_token_id] = -100 labels[labels == 262144] = -100 batch["labels"] = labels return batch ################ # Dataset ################ dataset = load_dataset(script_args.dataset_name, name=script_args.dataset_config) # Handle different dataset formats if script_args.dataset_name == "FanqingM/MMIU-Benchmark": dataset = prepare_dataset( dataset, script_args.dataset_name, script_args.dataset_train_split ) elif script_args.dataset_name == "ariG23498/intersection-dataset": # Format intersection dataset dataset = dataset.map( format_intersection_data, batched=True, batch_size=4, num_proc=4 ) ################ # Training ################ trainer = SFTTrainer( model=model, args=training_args, data_collator=collate_fn, train_dataset=dataset[script_args.dataset_train_split], eval_dataset=dataset[script_args.dataset_test_split] if training_args.eval_strategy != "no" else None, processing_class=processor.tokenizer, peft_config=my_get_peft_config(model_args), ) trainer.train() # Save and push to hub trainer.save_model(training_args.output_dir) if training_args.push_to_hub: trainer.push_to_hub(dataset_name=script_args.dataset_name) if trainer.accelerator.is_main_process: processor.push_to_hub(training_args.hub_model_id) if __name__ == "__main__": main()