in finealignment/video_alignment.py [0:0]
def adjust_scene_boundaries(video_path, initial_scenes, video_id, worker_number):
"""Adjust scene boundaries based on scene detection."""
# Initialize video manager and scene manager
video_manager = VideoManager([video_path])
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=15.0)) # Adjust threshold for sensitivity
# Start the video manager and obtain FPS
video_manager.start()
fps = video_manager.get_framerate() # Get FPS from VideoManager
# print(f"Detected FPS: {fps}")
# Get total frames using duration in seconds and fps
duration_seconds = video_manager.get_duration()[0].get_seconds()
total_frames = int(duration_seconds * fps)
last_frame_timecode = FrameTimecode(timecode=total_frames, fps=fps).get_timecode().split(".")[0].split(":")
last_frame_timecode = last_frame_timecode[1] + ":" + last_frame_timecode[2]
adjusted_scenes = []
for idx, initial_scene in enumerate(initial_scenes):
if idx == len(initial_scenes) - 1:
#Hack to avoid issues with answers that signal the last timestamp as 'end'
initial_scene['timestamps']['end_timestamp'] = last_frame_timecode
# print(last_frame_timecode)
start_timecode = time_to_frametimecode(initial_scene['timestamps']['start_timestamp'], fps, filename=video_id, worker_number = worker_number)
end_timecode = time_to_frametimecode(initial_scene['timestamps']['end_timestamp'], fps, filename=video_id, worker_number = worker_number)
# Ensure all FrameTimecode objects use the same fps
start_frame_number = int(max(0, FrameTimecode(timecode=start_timecode, fps=fps).get_frames() - 2 * fps))
end_frame_number = int(min(total_frames, FrameTimecode(timecode=end_timecode, fps=fps).get_frames() + 2 * fps))
search_start = FrameTimecode(timecode=start_frame_number, fps=fps)
search_end = FrameTimecode(timecode=end_frame_number, fps=fps)
# Seek to the start frame for detection using FrameTimecode
video_manager.seek(search_start)
scene_manager.detect_scenes(frame_source=video_manager, end_time=search_end.get_seconds())
detected_scenes = scene_manager.get_scene_list()
# Find closest detected boundaries, default to original timecodes if no match found
adjusted_start_timecode = start_timecode
adjusted_end_timecode = end_timecode
if detected_scenes:
closest_start = min(detected_scenes, key=lambda x: abs(x[0].get_frames() - FrameTimecode(timecode=start_timecode, fps=fps).get_frames()), default=None)
closest_end = min(detected_scenes, key=lambda x: abs(x[1].get_frames() - FrameTimecode(timecode=end_timecode, fps=fps).get_frames()), default=None)
if closest_start and abs(closest_start[0].get_frames() - FrameTimecode(timecode=start_timecode, fps=fps).get_frames()) < 2 * fps:
adjusted_start_timecode = closest_start[0].get_timecode()
distance = abs(closest_start[0].get_seconds() - FrameTimecode(timecode=start_timecode, fps=fps).get_seconds())
if distance > 2:
print(f"\t adjusting start timestamp by {distance:.2f} seconds")
print(f"\t\tFrom: {start_timecode} to {adjusted_start_timecode}" )
if distance >=5:
raise ValueError(f"Large start timestamp adjustment ({distance:.2f} seconds) required for scene {idx+1}")
if closest_end and abs(closest_end[1].get_frames() - FrameTimecode(timecode=end_timecode, fps=fps).get_frames()) < 2 * fps:
distance = abs(closest_end[1].get_seconds() - FrameTimecode(timecode=end_timecode, fps=fps).get_seconds())
adjusted_end_timecode = closest_end[1].get_timecode()
if distance > 2:
print(f"\t adjusting end timestamp by {distance:.2f} seconds")
print(f"\t\tFrom: {end_timecode} to {adjusted_end_timecode}" )
if distance >=5:
raise ValueError(f"Large start timestamp adjustment ({distance:.2f} seconds) required for scene {idx+1}")
# Update the JSON with FrameTimecode formatted as HH:MM:SS:FF
initial_scene['timestamps']['start_timestamp'] = adjusted_start_timecode
initial_scene['timestamps']['end_timestamp'] = adjusted_end_timecode
adjusted_scenes.append(initial_scene)
# Ensure continuity between scenes
if idx > 0:
previous_scene_end = FrameTimecode(timecode=adjusted_scenes[idx - 1]['timestamps']['end_timestamp'], fps=fps)
current_scene_start = FrameTimecode(timecode=adjusted_start_timecode, fps=fps)
# if current_scene_start.get_frames() <= previous_scene_end.get_frames():
# Set start of current scene to be exactly the frame after the end of the previous scene
new_start_timecode = previous_scene_end.get_frames() + 1
adjusted_scenes[idx]['timestamps']['start_timestamp'] = FrameTimecode(timecode=new_start_timecode, fps=fps).get_timecode()
frame_adjustment = abs(current_scene_start.get_frames() - new_start_timecode)
if frame_adjustment > 25:
print(f"\t\tWARNING: adjusting a scene start by {frame_adjustment} frames")
if frame_adjustment > 125:
raise ValueError(f"Large frame adjustment ({frame_adjustment} frames) required for scene {idx+1}")
video_manager.release()
return fps, adjusted_scenes