in mozetl/taar/taar_amodump.py [0:0]
def _fetch_versions(self, addon_map):
logger.info("Processing Version urls")
q = queue.Queue()
logger.info("Filling initial verson page queue")
def iterFactory(guid_map):
for guid in list(guid_map.keys()):
yield "https://addons.mozilla.org/api/v3/addons/addon/%s/versions/" % guid
def chunker(seq, size):
collector = []
for term in seq:
collector.append(term)
if len(collector) == size:
yield collector
collector = []
# Yield any dangling records we collected
if len(collector) > 0:
yield collector
total_processed_addons = 0
for chunk in chunker(iterFactory(addon_map), 500):
for i, url in enumerate(chunk):
q.put({"method": "GET", "url": url, "timeout": 2.0})
logger.info("Queue setup - processing initial version page requests")
logger.info("%d requests to process" % q.qsize())
p = pool.Pool(q, num_processes=self._max_processes)
p.join_all()
logger.info("Pool completed - processing responses")
last_page_urls = self._handle_version_responses(p)
logger.info("Captured %d last page urls" % len(last_page_urls))
total_processed_addons += len(last_page_urls)
# Try processing the exceptions once
p = pool.Pool.from_exceptions(
p.exceptions(), num_processes=self._max_processes
)
p.join_all()
last_page_urls.extend(self._handle_version_responses(p))
# Now fetch the last version of each addon
logger.info("Processing last page urls: %d" % len(last_page_urls))
p = pool.Pool.from_urls(last_page_urls, num_processes=self._max_processes)
p.join_all()
self._handle_last_version_responses(p, addon_map)
# Try processing exceptions once
p = pool.Pool.from_exceptions(
p.exceptions(), num_processes=self._max_processes
)
p.join_all()
self._handle_last_version_responses(p, addon_map)
logger.info(
"Processed %d addons with version info" % total_processed_addons
)