def run_job()

in distant_supervision/synthetic_data_creator.py [0:0]


    def run_job(self, sc, article_rdd, phrase_rdd, metric_fptr):
        total_nb_articles = article_rdd.count()
        print('Number of articles in corpus: {}\n'.format(total_nb_articles), file=metric_fptr)

        if phrase_rdd is None:
            phrase_rdd = self._calculate_phrase_rdd(article_rdd, metric_fptr)

        phrase_set = convert_ner_rdd_to_set(phrase_rdd, self.nb_ner_ulim)

        if self.ulim_count is not None:
            subsampled_article_rdd = self._perform_subsample_by_count(
                article_rdd,
                self.ulim_count,
                tot_count=total_nb_articles)
        else:
            subsampled_article_rdd = article_rdd

        if self.debug_save:
            subsampled_article_rdd.cache()  # only do this in debug_save
            print('Number of subsampled articles: {}\n'.format(subsampled_article_rdd.count()), file=metric_fptr)

        entity2queries_prdd = EntityToQueriesMapper(self.phrase_mode).get_entity_to_queries_v2(
            sc, subsampled_article_rdd, phrase_set, metric_fptr)

        ds_data_rdd_nonflat = entity2queries_prdd.mapPartitions(lambda x: self._compute_ds_data_by_partition(x))
        ds_data_rdd = ds_data_rdd_nonflat.flatMap(lambda x: x)

        ds_data_rdd.persist(StorageLevel.MEMORY_AND_DISK)

        # need to separate by different style and save it to separate folders
        self._split_by_style_and_write(ds_data_rdd)

        ds_data_rdd.map(lambda x: x.jsonify()).saveAsTextFile(os.path.join(self.output_dir, 'core_data'))

        StatComputation().print_output_stats(ds_data_rdd, metric_fptr)