def main()

in parlai/tasks/eli5/data_creation/download_support_docs.py [0:0]


def main():
    opt = setup_args()
    output_dir = pjoin(
        opt['datapath'], opt['output_dir'], 'processed_data/collected_docs'
    )
    wet_urls_path = pjoin(opt['datapath'], opt['output_dir'], opt['wet_urls'])
    f = open(wet_urls_path, buffering=4096)
    url_lst = [line.strip() for line in f if line.strip() != '']
    f.close()
    if opt['urls']:
        with PathManager.open(opt['urls']) as f:
            specific_urls = json.load(f)
        using_specific_urls = True
        using_specific_ids = False
    elif opt['ccuids']:
        with PathManager.open(opt['ccuids']) as f:
            specific_ids = json.load(f)
        using_specific_urls = False
        using_specific_ids = True
    else:
        sr_names = json.loads(opt['subreddit_names'])
        using_specific_urls = False
        using_specific_ids = False
    print("loading URL selection")
    ccrawl_ids_maps = {}
    reddit_id_group = {}
    sr_names = json.loads(opt['subreddit_names'])
    # make a list of the CommonCrawl UIDs or URLs we want to process and keep
    if using_specific_urls:
        select_urls = select_specific_urls(specific_urls)
    elif using_specific_ids:
        select_ccid = select_specific_ids(specific_ids)
    else:
        for name in sr_names:
            print(name)
            ccrawl_ids_maps[name] = json.load(
                open('pre_computed/%s_ccrawl_ids.json' % (name,))
            )
            for i, (k, _) in enumerate(ccrawl_ids_maps[name]):
                reddit_id_group[k] = (i * 10) // len(ccrawl_ids_maps[name])
        select_ccid = make_ccid_filter(ccrawl_ids_maps, opt['n_urls'])

    print("loaded URL selection")
    # organize directories
    if not isdir(output_dir):
        subprocess.run(['mkdir', output_dir], stdout=subprocess.PIPE)
        if not isdir(pjoin(output_dir, 'tmp')):
            subprocess.run(['mkdir', pjoin(output_dir, 'tmp')], stdout=subprocess.PIPE)
    if using_specific_ids:
        make_docs_directory(output_dir, 'specific_ids')
    elif using_specific_urls:
        make_docs_directory(output_dir, 'specific_urls')
    else:
        for name in sr_names:
            make_docs_directory(output_dir, name)

    # check whether some ccrawl files have already been processed for this slice
    if using_specific_ids:
        articles = dict([('specific_ids', dict([(i, []) for i in range(10)]))])
        mode = 'ids'
    elif using_specific_urls:
        articles = dict([('specific_urls', dict([(i, []) for i in range(10)]))])
        mode = 'urls'
    else:
        articles = dict(
            [(name, dict([(i, []) for i in range(10)])) for name in sr_names]
        )
        mode = 'subreddits'
    # check progress of slice or if slice is finished
    if isfile(pjoin(output_dir, 'tmp', 'counts_%s_%d.json' % (mode, opt['slnum']))):
        start_line = json.load(
            open(pjoin(output_dir, 'tmp', 'counts_%s_%d.json' % (mode, opt['slnum'])))
        )
        if start_line == 'finished':
            return True
        for name in sr_names:
            for i_st in range(10):
                d_name = pjoin(output_dir, name, str(i_st))
                articles[name][i] = json.load(
                    open(pjoin(d_name, "docs_slice_%05d.json" % (opt['slnum'])))
                )
        print(
            "loaded previously downloaded pages:",
            start_line - opt['slnum'] * opt['slsize'],
        )
    else:
        start_line = opt['slnum'] * opt['slsize']
    # Download and parse slice of args.slsize WET files
    st_time = time()
    for i in range(start_line, min((opt['slnum'] + 1) * opt['slsize'], len(url_lst))):
        # Download wet file from amazon AWS
        dl_time = time()
        fname = url_lst[i].split('/')[-1][:-3]
        # download and unzip if necessary
        fpath = pjoin(output_dir, 'tmp', fname)
        print("processing", fpath)
        if not isfile(fpath):
            ct_try = 0
            while not isfile(fpath):
                subprocess.run(['rm', fpath + ".gz"], stdout=subprocess.PIPE)
                while not isfile(fpath + ".gz"):
                    url = "https://commoncrawl.s3.amazonaws.com/" + url_lst[i]
                    subprocess.run(
                        ['wget', '-P', pjoin(output_dir, 'tmp'), url],
                        stdout=subprocess.PIPE,
                    )
                    print("download:", time() - dl_time)
                    ct_try += 1
                    if ct_try > 5 and not isfile(fpath + ".gz"):
                        print("giving up on file", fname)
                        break
                downloaded = isfile(fpath + ".gz")
                if downloaded:
                    subprocess.run(['gunzip', fpath + ".gz"], stdout=subprocess.PIPE)
                    print("download and gunzip:", time() - dl_time)
                if ct_try > 5 and not isfile(fpath):
                    print("giving up on file", fname)
                    break
        else:
            downloaded = isfile(fpath)
        if not downloaded:
            print("FAILED DOWNLOADING ", fpath)
            continue
        # Extract, tokenize, and filter articles by language
        f = open(fpath, buffering=4096)
        article_url = ''
        article_id = ''
        article_txt = ''
        last_line = ''
        read_text = False
        ct = 0
        start_time = time()
        ccid_path_tuple = False
        # check and save pages by IDs if getting posts by IDs, or by URLs
        # if using URLs
        for line in f:
            if line.startswith("WARC/1.0"):
                if ccid_path_tuple:
                    ct += 1
                    article = {
                        'ccid': article_id,
                        'url': article_url,
                        'text': word_url_tokenize(article_txt),
                    }
                    if not using_specific_urls and not using_specific_ids:
                        name, eli_k, num = ccid_path_tuple
                        articles[name][reddit_id_group[eli_k]] += [
                            (eli_k, num, article)
                        ]
                    else:
                        name, num = ccid_path_tuple
                        articles[name][num % 10] += [(num, article)]
                article_txt = ''
                read_text = False
            if line.startswith("WARC-Target-URI"):
                try:
                    article_url = line.strip().split()[-1]
                    if using_specific_urls:
                        ccid_path_tuple = check_url(select_urls, article_url)
                except Exception:
                    article_url = '<UNK>'
                    if using_specific_urls:
                        ccid_path_tuple = False
            if line.startswith("WARC-Record-ID"):
                try:
                    article_id = line.strip().split()[-1]
                    if not using_specific_urls:
                        ccid_path_tuple = select_ccid.get(article_id, False)
                except Exception:
                    article_id = '<UNK>'
                    if not using_specific_urls:
                        ccid_path_tuple = False
            if read_text and (last_line.strip() + line.strip()) != '':
                article_txt += line + '\n'
                last_line = line
            if line.startswith("Content-Length: ") and ccid_path_tuple:
                read_text = True
        if ccid_path_tuple:
            ct += 1
            article = {
                'ccid': article_id,
                'url': article_url,
                'text': word_url_tokenize(article_txt),
            }

            if not using_specific_urls and not using_specific_ids:
                name, eli_k, num = ccid_path_tuple
                articles[name][reddit_id_group[eli_k]] += [(eli_k, num, article)]
            else:
                name, num = ccid_path_tuple
                articles[name][num % 10] += [(num, article)]
        f.close()
        subprocess.run(['rm', fpath], stdout=subprocess.PIPE)
        # periodically save slice
        print(">>>>>>>>>> ARTICLES FOUND %d in %.2f" % (ct, time() - start_time))
        if i % opt['save_freq'] == opt['save_freq'] - 1:
            for name, elik_maps in articles.items():
                print('saving', name, i, len(elik_maps))
                for i_st, ls in elik_maps.items():
                    d_name = pjoin(output_dir, name, str(i_st))
                    if not isdir(d_name):
                        subprocess.run(['mkdir', d_name], stdout=subprocess.PIPE)
                    json.dump(
                        ls,
                        open(
                            pjoin(d_name, "docs_slice_%05d.json" % (opt['slnum'])), 'w'
                        ),
                    )
            json.dump(
                i + 1,
                open(
                    pjoin(
                        output_dir, 'tmp', 'counts_%s_%d.json' % (mode, opt['slnum'])
                    ),
                    'w',
                ),
            )
            print('saved json files %.2f' % (time() - start_time,))
        subprocess.run(['rm', fpath], stdout=subprocess.PIPE)
    # save items to slices
    for name, elik_maps in articles.items():
        print('saving', name, i, len(elik_maps))
        for i_st, ls in elik_maps.items():
            d_name = pjoin(output_dir, name, str(i_st))
            if not isdir(d_name):
                subprocess.run(['mkdir', d_name], stdout=subprocess.PIPE)
            json.dump(
                ls, open(pjoin(d_name, "docs_slice_%05d.json" % (opt['slnum'])), 'w')
            )
    print('saved json files %.2f' % (time() - start_time,))
    json.dump(
        'finished',
        open(pjoin(output_dir, 'tmp', 'counts_%s_%d.json' % (mode, opt['slnum'])), 'w'),
    )
    print("processing slice %d took %f seconds" % (i, time() - st_time))