in modules/url_comparison.py [0:0]
def process_multiple_urls(self, url_list):
"""
Function to multithread URL procesing, by parceling
out jobs to various pebble multithreading 'workers'.
We use pebble so we can enforce a timeout--sometimes a URL
points to GB-sized files, other times the website returns nothing
for hours.
input: list of URLs
output: dataframe with processed output
:param url_list: list, where each element is an URL in string
:return: pd.DataFrame, where each row contains the comparison result
for one pair of URLs.
"""
if len(url_list) == 0:
raise ValueError("empty list!")
start_idx = 0
i = 0
results = []
start = time.time()
for partial_url_list in self._chunker(
url_list, self.chunksize, start_idx):
print(
str(float(i) / len(url_list) * 100) + " percent complete",
file=sys.stderr)
print(
"Elapsed Time: " + str(time.time() - start),
file=sys.stderr)
with ProcessPool(max_workers=self.max_worker) as pool:
future = pool.map(
self.process_one_url,
partial_url_list,
timeout=self.process_timeout
)
iterator = future.result()
# iterate over all results, if a computation timed out
# print it and continue to the next result
for index in range(len(partial_url_list)):
try:
result = next(iterator)
results.append(result)
except StopIteration:
break
except TimeoutError as error:
message = \
"Function took longer than %d seconds" \
% error.args[1]
logging.error(message)
results.append(
self.process_one_url_empty_result(
partial_url_list[index], message))
except Exception as e:
message = "other error: " + str(e)
logging.error(message)
results.append(
self.process_one_url_empty_result(
partial_url_list[index], message))
i += len(partial_url_list)
print(
"Elapsed Time: " + str(time.time() - start),
file=sys.stderr)
print(
"Rate: "
+ str(len(url_list) / (time.time() - start))
+ " urls per second",
file=sys.stderr
)
url_info = pd.concat(results, axis=0).reset_index()
return url_info