in preprocess/WikiExtractor.py [0:0]
def process_dump(input_file, template_file, out_file, file_size, file_compress,
process_count):
"""
:param input_file: name of the wikipedia dump file; '-' to read from stdin
:param template_file: optional file with template definitions.
:param out_file: directory where to store extracted preprocess, or '-' for stdout
:param file_size: max size of each extracted file, or None for no max (one file)
:param file_compress: whether to compress files with bzip.
:param process_count: number of extraction processes to spawn.
"""
if input_file == '-':
input = sys.stdin
else:
input = fileinput.FileInput(input_file, openhook=fileinput.hook_compressed)
# collect siteinfo
for line in input:
# When an input file is .bz2 or .gz, line can be a bytes even in Python 3.
if not isinstance(line, text_type): line = line.decode('utf-8')
m = tagRE.search(line)
if not m:
continue
tag = m.group(2)
if tag == 'base':
# discover urlbase from the xml dump file
# /mediawiki/siteinfo/base
base = m.group(3)
options.urlbase = base[:base.rfind("/")]
elif tag == 'namespace':
mk = keyRE.search(line)
if mk:
nsid = ''.join(mk.groups())
else:
nsid = ''
options.knownNamespaces[m.group(3)] = nsid
if re.search('key="10"', line):
options.templateNamespace = m.group(3)
options.templatePrefix = options.templateNamespace + ':'
elif re.search('key="828"', line):
options.moduleNamespace = m.group(3)
options.modulePrefix = options.moduleNamespace + ':'
elif tag == '/siteinfo':
break
if options.expand_templates:
# preprocess
template_load_start = default_timer()
if template_file:
if os.path.exists(template_file):
logging.info("Loading template definitions from: %s", template_file)
# can't use with here:
file = fileinput.FileInput(template_file,
openhook=fileinput.hook_compressed)
load_templates(file)
file.close()
else:
if input_file == '-':
# can't scan then reset stdin; must error w/ suggestion to specify template_file
raise ValueError("to use templates with stdin dump, must supply explicit template-file")
logging.info("Preprocessing '%s' to collect template definitions: this may take some time.", input_file)
load_templates(input, template_file)
input.close()
input = fileinput.FileInput(input_file, openhook=fileinput.hook_compressed)
template_load_elapsed = default_timer() - template_load_start
logging.info("Loaded %d templates in %.1fs", len(options.templates), template_load_elapsed)
# process pages
logging.info("Starting page extraction from %s.", input_file)
extract_start = default_timer()
# Parallel Map/Reduce:
# - pages to be processed are dispatched to workers
# - a reduce process collects the results, sort them and print them.
process_count = max(1, process_count)
maxsize = 10 * process_count
# output queue
output_queue = Queue(maxsize=maxsize)
if out_file == '-':
out_file = None
worker_count = process_count
# load balancing
max_spool_length = 10000
spool_length = Value('i', 0, lock=False)
# reduce job that sorts and prints output
reduce = Process(target=reduce_process,
args=(options, output_queue, spool_length,
out_file, file_size, file_compress))
reduce.start()
# initialize jobs queue
jobs_queue = Queue(maxsize=maxsize)
# start worker processes
logging.info("Using %d extract processes.", worker_count)
workers = []
for i in range(worker_count):
extractor = Process(target=extract_process,
args=(options, i, jobs_queue, output_queue))
extractor.daemon = True # only live while parent process lives
extractor.start()
workers.append(extractor)
# Mapper process
page_num = 0
for page_data in pages_from(input):
id, revid, title, ns, catSet, page = page_data
if keepPage(ns, catSet, page):
# slow down
delay = 0
if spool_length.value > max_spool_length:
# reduce to 10%
while spool_length.value > max_spool_length/10:
time.sleep(10)
delay += 10
if delay:
logging.info('Delay %ds', delay)
job = (id, revid, title, page, page_num)
jobs_queue.put(job) # goes to any available extract_process
page_num += 1
page = None # free memory
input.close()
# signal termination
for _ in workers:
jobs_queue.put(None)
# wait for workers to terminate
for w in workers:
w.join()
# signal end of work to reduce process
output_queue.put(None)
# wait for it to finish
reduce.join()
extract_duration = default_timer() - extract_start
extract_rate = page_num / extract_duration
logging.info("Finished %d-process extraction of %d articles in %.1fs (%.1f art/s)",
process_count, page_num, extract_duration, extract_rate)
logging.info("total of page: %d, total of articl page: %d; total of used articl page: %d" % (g_page_total, g_page_articl_total,g_page_articl_used_total))