in distant_supervision/synthetic_data_creator.py [0:0]
def run_job(self, sc, article_rdd, phrase_rdd, metric_fptr):
total_nb_articles = article_rdd.count()
print('Number of articles in corpus: {}\n'.format(total_nb_articles), file=metric_fptr)
if phrase_rdd is None:
phrase_rdd = self._calculate_phrase_rdd(article_rdd, metric_fptr)
phrase_set = convert_ner_rdd_to_set(phrase_rdd, self.nb_ner_ulim)
if self.ulim_count is not None:
subsampled_article_rdd = self._perform_subsample_by_count(
article_rdd,
self.ulim_count,
tot_count=total_nb_articles)
else:
subsampled_article_rdd = article_rdd
if self.debug_save:
subsampled_article_rdd.cache() # only do this in debug_save
print('Number of subsampled articles: {}\n'.format(subsampled_article_rdd.count()), file=metric_fptr)
entity2queries_prdd = EntityToQueriesMapper(self.phrase_mode).get_entity_to_queries_v2(
sc, subsampled_article_rdd, phrase_set, metric_fptr)
ds_data_rdd_nonflat = entity2queries_prdd.mapPartitions(lambda x: self._compute_ds_data_by_partition(x))
ds_data_rdd = ds_data_rdd_nonflat.flatMap(lambda x: x)
ds_data_rdd.persist(StorageLevel.MEMORY_AND_DISK)
# need to separate by different style and save it to separate folders
self._split_by_style_and_write(ds_data_rdd)
ds_data_rdd.map(lambda x: x.jsonify()).saveAsTextFile(os.path.join(self.output_dir, 'core_data'))
StatComputation().print_output_stats(ds_data_rdd, metric_fptr)