in agora/shopper_insights_api/src/video_processor.py [0:0]
def process_frames(self):
"""
Main processing loop with YOLOv8 tracking and age detection.
"""
frame_count = 0
last_time = time.time()
while self.running:
frame, success = self.vs.read()
if not success:
continue
# FPS calculation
frame_count += 1
current_time = time.time()
if current_time - last_time >= 1:
self.fps = frame_count / (current_time - last_time)
frame_count = 0
last_time = current_time
start_time = time.time()
# Run YOLOv8 detection with tracking
results = self.model.track(
frame,
persist=True,
classes=[0], # Only track persons
conf=self.min_confidence,
)[0]
# Process tracked objects
if results.boxes.id is not None:
boxes = results.boxes.xywh.cpu() # Get boxes in xywh format
track_ids = results.boxes.id.cpu().numpy()
confidences = results.boxes.conf.cpu().numpy()
self.detected_persons = len(track_ids)
self.current_shoppers_hashes = []
# Process each detection
for box, track_id, conf in zip(boxes, track_ids, confidences):
x, y, w, h = box
track_id = int(track_id)
# Convert to bbox format
bbox = [
int(x - w/2), int(y - h/2), # x1, y1
int(x + w/2), int(y + h/2) # x2, y2
]
# Generate unique hash for person
person_hash = hashlib.md5(f"{track_id}".encode()).hexdigest()[:8]
# Extract and update age
age = self.extract_age(frame, bbox)
self.update_age_stats(person_hash, age)
# Update area presence if any areas are set
if self.restricted_areas and len(self.restricted_areas) > 0:
self.update_area_presence(person_hash, age, bbox, frame.shape, current_time)
# Draw bounding box and information
color = (0, 255, 0)
cv2.rectangle(frame, (bbox[0], bbox[1]), (bbox[2], bbox[3]), color, 2)
cv2.putText(frame, f"ID: {person_hash} Age: {age}",
(bbox[0], bbox[1] - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 2)
# Draw restricted areas if any areas are set
if self.restricted_areas and len(self.restricted_areas) > 0:
self.draw_restricted_areas(frame)
# Calculate and display performance metrics
processing_time = (time.time() - start_time) * 1000
self.processing_times.append(processing_time)
avg_processing_time = np.mean(self.processing_times)
inference_fps = 1000 / avg_processing_time
if self.debug:
self.draw_debug_info(frame, avg_processing_time, inference_fps)
cv2.putText(frame, f"FPS: {self.fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.75, (255, 255, 0), 1, cv2.LINE_AA)
# Queue the processed frame
if not self.processed_frame_queue.full():
self.processed_frame_queue.put(frame)
# Check for inactivity
if time.time() - self.last_activity > self.inactivity_threshold:
if self.debug:
print(f"Inactive for {self.inactivity_threshold} seconds. Stopping.")
self.stop()
break