in agora/footfall_ai_api/src/video_processor.py [0:0]
def process_frames(self):
processing_times = collections.deque(maxlen=200)
frame_count = 0
last_time = time.time()
while self.running:
frame, success = self.vs.read()
original_frame = frame.copy()
if not success:
continue
frame_count += 1
current_time = time.time()
if current_time - last_time >= 1:
self.fps = frame_count / (current_time - last_time)
frame_count = 0
last_time = current_time
if self.debug:
cv2.putText(frame, f"FPS: {self.fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 0), 1, cv2.LINE_AA)
if all(point == (0, 0) for point in self.line_points):
processed_frame = frame
else:
start_time = time.time()
tracks = self.model.track(frame, persist=True, classes=self.classes_to_count, verbose=False)
# Rese to avoid keeping history
if time.time() - self.last_reset > 5:
self.initialize_counter()
processed_frame = self.counter.start_counting(frame, tracks)
processing_time = (time.time() - start_time) * 1000
processing_times.append(processing_time)
avg_processing_time = np.mean(processing_times)
inference_fps = 1000 / avg_processing_time
if self.debug:
cv2.putText(processed_frame, f"Inference time: {avg_processing_time:.1f}ms ({inference_fps:.1f} FPS)",
(10, 60), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 0), 1, cv2.LINE_AA)
if not self.processed_frame_queue.full():
if self.debug:
self.processed_frame_queue.put(frame)
else:
self.processed_frame_queue.put(original_frame)
# Check for inactivity
if time.time() - self.last_activity > self.inactivity_threshold:
if self.debug:
print(f"Video {self.index} inactive for {self.inactivity_threshold} seconds. Stopping thread.")
self.stop()
break