in src/python/detectors/semaphore_overflow_prevention/semaphore_overflow_prevention.py [0:0]
def post_tasks_compliant(jobs, es_url):
import multiprocessing
import requests
jobs = multiprocessing.JoinableQueue()
while True:
try:
image, image_name, tag = jobs.get()
formatted_es_url = es_url.format(image_name)
files = {'file': image.content, 'tag': tag}
r = requests.post(formatted_es_url, files=files)
finally:
# Compliant: calls JoinableQueue.task_done()
# for each task removed from the JoinableQueue.
jobs.task_done()