in parlai/tasks/eli5/data_creation/download_reddit_qalist.py [0:0]
def main():
opt = setup_args()
output_dir = pjoin(opt['datapath'], opt['output_dir'])
### collect submissions and comments monthly URLs
date_to_url_submissions = gather_dump_urls(REDDIT_URL, "submissions")
date_to_url_comments = gather_dump_urls(REDDIT_URL, "comments")
date_to_urls = {}
for k, v in date_to_url_submissions.items():
date_to_urls[k] = (v, date_to_url_comments.get(k, ''))
### download, filter, process, remove
subprocess.run(['mkdir', pjoin(output_dir, 'reddit_tmp')], stdout=subprocess.PIPE)
st_time = time()
if not opt['id_list']:
subreddit_names = json.loads(opt['subreddit_list'])
output_files = dict(
[
(name, "%s/processed_data/%s_qalist.json" % (output_dir, name))
for name in subreddit_names
]
)
qa_dict = dict([(name, {}) for name in subreddit_names])
for name, fname in output_files.items():
if isfile(fname):
print("loading already processed documents from", fname)
f = open(fname)
qa_dict[name] = dict(json.load(f))
f.close()
print("loaded already processed documents")
# slice file save
# get monthly reddit dumps
for year in range(opt['start_year'], opt['end_year'] + 1):
st_month = opt['start_month'] if year == opt['start_year'] else 1
end_month = opt['end_month'] if year == opt['end_year'] else 12
months = range(st_month, end_month + 1)
for month in months:
merged_comments = 0
submissions_url, comments_url = date_to_urls[(year, month)]
if not opt['answers_only']:
try:
processed_submissions = download_and_process(
submissions_url,
'submissions',
subreddit_names,
st_time,
output_dir,
)
except FileNotFoundError:
sleep(60)
print("retrying %s once" % (submissions_url))
processed_submissions = download_and_process(
submissions_url,
'submissions',
subreddit_names,
st_time,
output_dir,
)
for name in subreddit_names:
for dct in processed_submissions[name]:
qa_dict[name][dct['id']] = dct
if not opt['questions_only']:
try:
processed_comments = download_and_process(
comments_url,
'comments',
subreddit_names,
st_time,
output_dir,
)
except FileNotFoundError:
sleep(60)
print("retrying %s once" % (comments_url))
processed_comments = download_and_process(
comments_url,
'comments',
subreddit_names,
st_time,
output_dir,
)
# merge submissions and comments
for name in subreddit_names:
merged_comments = 0
for dct in processed_comments[name]:
did = dct['parent_id'].split('_')[-1]
if did in qa_dict[name]:
merged_comments += 1
comments_list = qa_dict[name][did].get(
'comments', []
) + [dct]
qa_dict[name][did]['comments'] = sorted(
comments_list,
key=lambda x: x['score'],
reverse=True,
)
print(
"----- added to global dictionary",
name,
year,
month,
time() - st_time,
merged_comments,
len(qa_dict[name]),
)
for name, out_file_name in output_files.items():
fo = open(out_file_name, "w")
json.dump(
[(eli_k, eli_dct) for eli_k, eli_dct in qa_dict[name].items()],
fo,
)
fo.close()
# get specific reddit posts
if opt['id_list']:
with PathManager.open(opt['id_list']) as f:
post_ids = json.load(f)
sr_names = None
if not opt['answers_only']:
try:
sr_names, processed_submissions = download_and_process_posts(
post_ids, st_time
)
except FileNotFoundError:
sleep(60)
print("retrying %s once" % (submissions_url))
sr_names, processed_submissions = download_and_process_posts(
post_ids, st_time
)
output_files = dict(
[
(name, "%s/processed_data/%s_qalist.json" % (output_dir, name))
for name in sr_names
]
)
qa_dict = dict([(name, {}) for name in sr_names])
for name, fname in output_files.items():
if isfile(fname):
print("loading already processed documents from", fname)
f = open(fname)
qa_dict[name] = dict(json.load(f))
f.close()
print("loaded already processed documents")
for name in sr_names:
for dct in processed_submissions[name]:
qa_dict[name][dct['id']] = dct
if not opt['questions_only']:
try:
sr_names, processed_comments = download_and_process_comments(
post_ids, st_time
)
except FileNotFoundError:
sleep(60)
print("retrying %s once" % (submissions_url))
sr_names, processed_comments = download_and_process_comments(
post_ids, st_time
)
output_files = dict(
[
(name, "%s/processed_data/%s_qalist.json" % (output_dir, name))
for name in sr_names
]
)
qa_dict = dict([(name, {}) for name in sr_names])
for name, fname in output_files.items():
if isfile(fname):
print("loading already processed documents from", fname)
f = open(fname)
qa_dict[name] = dict(json.load(f))
f.close()
print("loaded already processed documents")
# merge submissions and comments
for name in sr_names:
merged_comments = 0
for dct in processed_comments[name]:
did = dct['parent_id'].split('_')[-1]
if did in qa_dict[name]:
merged_comments += 1
comments_list = qa_dict[name][did].get('comments', []) + [dct]
qa_dict[name][did]['comments'] = sorted(
comments_list, key=lambda x: x['score'], reverse=True
)
print(
"----- added to global dictionary",
name,
time() - st_time,
merged_comments,
len(qa_dict[name]),
)
for name, out_file_name in output_files.items():
fo = open(out_file_name, "w")
json.dump(
[(eli_k, eli_dct) for eli_k, eli_dct in qa_dict[name].items()], fo
)
fo.close()
if not opt['questions_only']:
for name, out_file_name in output_files.items():
qa_dct_list = [
(k, post_process(rdct, name))
for k, rdct in qa_dict[name].items()
if 'comments' in rdct
]
qa_dct_list = [
x
for x in qa_dct_list
if len(x[1]['comments']) > 0 and name in x[1]['url'].lower()
]
fo = open(out_file_name, "w")
json.dump(qa_dct_list, fo)
fo.close()