vision/smolvlm2/smolvlm/datasets/dataset.py [427:561]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        self.max_frames = getattr(data_args, "max_frames", 25)
        self.video_target_size = getattr(data_args, "video_target_size", 384)
        self.image_target_size = getattr(data_args, "image_target_size", 1536)
        self.data_folder = getattr(data_args, "data_folder", "")

        subdir = dataset_args.get("path", "")
        self.mm_path = os.path.join(self.data_folder, subdir)

        self.name = dataset_args.get("name", "unnamed_dataset")
        self.modality = dataset_args.get("modality", "unknown")
        self.source_fps = dataset_args.get("source_fps", 1)

        data_path = dataset_args["json_path"]
        self.list_data_dict = self._load_data(data_path)

        sampling_strategy = dataset_args.get("sampling_strategy", "all")
        self._apply_sampling_strategy(sampling_strategy)

        logger.info(
            f"[SupervisedDataset: {self.name}] - Label Masking Logic. "
            f"\nmask_user_tokens: {self.mask_user_tokens}, mask_system_tokens: {self.mask_system_tokens}\n"
        )
        logger.info(
            f"[SupervisedDataset: {self.name}] Final dataset size: {len(self.list_data_dict)}\n"
            f"Dataset Arguments - FPS: {self.target_fps}, "
            f"Max Frames: {self.max_frames}, "
            f"Video Target Size: {self.video_target_size}, "
            f"Image Target Size: {self.image_target_size}"
        )

    def _load_data(self, json_path: str) -> List[Dict[str, Any]]:
        if not os.path.isfile(json_path):
            raise FileNotFoundError(f"File not found: {json_path}")

        if json_path.endswith(".json"):
            with open(json_path, "r") as f:
                data = json.load(f)
        elif json_path.endswith(".jsonl"):
            data = []
            with open(json_path, "r") as f:
                for line in f:
                    data.append(json.loads(line.strip()))
        else:
            raise ValueError(f"Unsupported file format: {json_path}")

        logger.info(f"[{self.name}] Loaded {len(data)} items from {json_path}")
        return data

    def _apply_sampling_strategy(self, strategy: str):
        if strategy == "all":
            return
        if ":" not in strategy:
            return

        kind, amount_str = strategy.split(":")
        total = len(self.list_data_dict)

        if amount_str.endswith("%"):
            pct = float(amount_str.strip("%"))
            sampling_number = max(1, math.ceil(total * pct / 100.0))
        else:
            sampling_number = int(amount_str)

        if kind == "first":
            self.list_data_dict = self.list_data_dict[:sampling_number]
        elif kind == "end":
            self.list_data_dict = self.list_data_dict[-sampling_number:]
        elif kind == "random":
            random.seed(42)
            random.shuffle(self.list_data_dict)
            self.list_data_dict = self.list_data_dict[:sampling_number]

        logger.info(f"[{self.name}] after subsampling '{strategy}': {len(self.list_data_dict)} remain.")
    
    def __len__(self) -> int:
        return len(self.list_data_dict)
        
    def __getitem__(self, i) -> Dict[str, torch.Tensor]:
        # TODO: define number of retries somewhere else
        num_base_retries = 3

        # try the current sample first
        for attempt_idx in range(num_base_retries):
            try:
                sample = self._get_item(i)
                return sample
            except Exception as e:
                # sleep 1s in case it is a cloud disk issue
                print(f"[Try #{attempt_idx}] Failed to fetch sample {i}. Exception:", e)
                time.sleep(1)

        # try other samples, in case it is file corruption issue
        for attempt_idx in range(num_base_retries):
            try:
                #next_index = min(i + 1, len(self.list_data_dict) - 1)
                random.seed(42) # TODO: should we set this here, or is this global variable we set anyway? make sure this makes sense. 
                next_index = random.choice(range(len(self.list_data_dict)))
                sample = self._get_item(next_index)
                return sample
            except Exception as e:
                # no need to sleep
                print(f"[Try other #{attempt_idx}] Failed to fetch sample {next_index}. Exception:", e)
                pass

        try:
            sample = self._get_item(i)
            return sample
        except Exception as e:
            raise e
            
    def _get_item(self, idx: int) -> Dict[str, torch.Tensor]:
        sources = self.list_data_dict[idx]
        if isinstance(idx, int):
            sources = [sources]
    
        content_type = sources[0].get("type", self.modality).lower()
        frames: List[Image.Image] = []
        timestamps: List[str] = []
        duration_seconds = None
        
        if content_type == "video":
            ## load videos
            #self.processor.image_processor.size = (self.video_target_size, self.video_target_size)
            self.processor.image_processor.size = {"longest_edge": self.video_target_size}
            self.processor.image_processor.do_resize = True
            self.processor.image_processor.do_image_splitting = False
            media = sources[0].get("video") or sources[0].get("image")
            if media:
                path = os.path.join(self.mm_path, media)
                if os.path.isdir(path):
                    ## TODO: can we simplify this logic??
                    frames, timestamps, duration_seconds = load_image_directory_as_frames(
                        folder_path=path,
                        source_fps=self.source_fps,
                        target_fps=self.target_fps,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



vision/smolvlm2/smolvlm/datasets/dataset_clip_sampling.py [559:694]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        self.max_frames = getattr(data_args, "max_frames", 25)
        self.video_target_size = getattr(data_args, "video_target_size", 384)
        self.image_target_size = getattr(data_args, "image_target_size", 1536)
        self.data_folder = getattr(data_args, "data_folder", "")

        subdir = dataset_args.get("path", "")
        self.mm_path = os.path.join(self.data_folder, subdir)

        self.name = dataset_args.get("name", "unnamed_dataset")
        self.modality = dataset_args.get("modality", "unknown")
        self.source_fps = dataset_args.get("source_fps", 1)

        data_path = dataset_args["json_path"]
        self.list_data_dict = self._load_data(data_path)

        sampling_strategy = dataset_args.get("sampling_strategy", "all")
        self._apply_sampling_strategy(sampling_strategy)

        logger.info(
            f"[SupervisedDataset: {self.name}] - Label Masking Logic. "
            f"\nmask_user_tokens: {self.mask_user_tokens}, mask_system_tokens: {self.mask_system_tokens}\n"
        )
        logger.info(
            f"[SupervisedDataset: {self.name}] Final dataset size: {len(self.list_data_dict)}\n"
            f"Dataset Arguments - FPS: {self.target_fps}, "
            f"Max Frames: {self.max_frames}, "
            f"Video Target Size: {self.video_target_size}, "
            f"Image Target Size: {self.image_target_size}"
        )

    def _load_data(self, json_path: str) -> List[Dict[str, Any]]:
        if not os.path.isfile(json_path):
            raise FileNotFoundError(f"File not found: {json_path}")

        if json_path.endswith(".json"):
            with open(json_path, "r") as f:
                data = json.load(f)
        elif json_path.endswith(".jsonl"):
            data = []
            with open(json_path, "r") as f:
                for line in f:
                    data.append(json.loads(line.strip()))
        else:
            raise ValueError(f"Unsupported file format: {json_path}")

        logger.info(f"[{self.name}] Loaded {len(data)} items from {json_path}")
        return data

    def _apply_sampling_strategy(self, strategy: str):
        if strategy == "all":
            return
        if ":" not in strategy:
            return

        kind, amount_str = strategy.split(":")
        total = len(self.list_data_dict)

        if amount_str.endswith("%"):
            pct = float(amount_str.strip("%"))
            sampling_number = max(1, math.ceil(total * pct / 100.0))
        else:
            sampling_number = int(amount_str)

        if kind == "first":
            self.list_data_dict = self.list_data_dict[:sampling_number]
        elif kind == "end":
            self.list_data_dict = self.list_data_dict[-sampling_number:]
        elif kind == "random":
            random.seed(42)
            random.shuffle(self.list_data_dict)
            self.list_data_dict = self.list_data_dict[:sampling_number]

        logger.info(f"[{self.name}] after subsampling '{strategy}': {len(self.list_data_dict)} remain.")
    
    def __len__(self) -> int:
        return len(self.list_data_dict)
        
    def __getitem__(self, i) -> Dict[str, torch.Tensor]:
        # TODO: define number of retries somewhere else
        num_base_retries = 3

        # try the current sample first
        for attempt_idx in range(num_base_retries):
            try:
                sample = self._get_item(i)
                return sample
            except Exception as e:
                # sleep 1s in case it is a cloud disk issue
                print(f"[Try #{attempt_idx}] Failed to fetch sample {i}. Exception:", e)
                time.sleep(1)

        # try other samples, in case it is file corruption issue
        for attempt_idx in range(num_base_retries):
            try:
                #next_index = min(i + 1, len(self.list_data_dict) - 1)
                random.seed(42) # TODO: should we set this here, or is this global variable we set anyway? make sure this makes sense. 
                next_index = random.choice(range(len(self.list_data_dict)))
                sample = self._get_item(next_index)
                return sample
            except Exception as e:
                # no need to sleep
                print(f"[Try other #{attempt_idx}] Failed to fetch sample {next_index}. Exception:", e)
                pass

        try:
            sample = self._get_item(i)
            return sample
        except Exception as e:
            raise e
            
    def _get_item(self, idx: int) -> Dict[str, torch.Tensor]:
        sources = self.list_data_dict[idx]
        if isinstance(idx, int):
            sources = [sources]
            
        content_type = sources[0].get("type", self.modality).lower()

        frames: List[Image.Image] = []
        timestamps: List[str] = []
        duration_seconds = None
        
        if content_type == "video":
            ## load videos
            #self.processor.image_processor.size = (self.video_target_size, self.video_target_size)
            self.processor.image_processor.size = {"longest_edge": self.video_target_size}
            self.processor.image_processor.do_resize = True
            self.processor.image_processor.do_image_splitting = False
            media = sources[0].get("video") or sources[0].get("image")
            if media:
                path = os.path.join(self.mm_path, media)
                if os.path.isdir(path):
                    ## TODO: can we simplify this logic??
                    frames, timestamps, duration_seconds = load_image_directory_as_frames(
                        folder_path=path,
                        source_fps=self.source_fps,
                        target_fps=self.target_fps,
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



