in test.py [0:0]
def process_video_from_file(file_path, prepare_queue, inference_queue):
log.info('process_video_from_file')
frames = []
vidcap = cv2.VideoCapture(file_path)
success, frame = vidcap.read()
success = True
log.info('start frame extraction')
max_frame = 0
while success:
frames.append(frame)
success, frame = vidcap.read()
max_frame += 1
if max_frame == FRAME_MAX:
break
log.info('end frame extraction')
count = len(frames)
add_worker = threading.Thread(target=add_to_prepare, args=(prepare_queue, frames,))
add_worker.start()
log.info('frame count: %d', count)
batch = []
predictions = []
log.info('frame batch %d', FRAME_BATCH)
for i in range(count):
batch.append(inference_queue.get())
if len(batch) == FRAME_BATCH or i == (count - 1):
log.info('range: %d - batch: %d', i, len(batch))
for v in get_classes_with_scores(get_predictions_from_image_array(batch)):
predictions.append(str(v))
predictions.append('\n')
batch.clear()
vidcap.release()
#cv2.destroyAllWindows()
return predictions