def _index_rdd_partition()

in distant_supervision/ds_es_client.py [0:0]


    def _index_rdd_partition(self, article_lst):
        es_conf = self.es_conf
        es = ElasticsearchMagic.get_instance('singleton', hosts=[es_conf.hosts])

        def gendata():
            for article in article_lst:
                for sent_obj in article.sents:
                    text_body = sent_obj.text

                    yield {
                        '_op_type': 'create',  # `create` will fail on duplicate _id
                        "_index": es_conf.index_name,
                        "_type": es_conf.doc_type,
                        '_id': sent_obj.id,
                        "_source": {
                            'body': text_body,
                            'body_with_title': '{} \n {}'.format(article.title, text_body),
                            'article_id': article.id,
                            'article_title': article.title,
                            'entities': json.dumps(sent_obj.ents),
                            'noun_chunks': json.dumps(sent_obj.noun_chunks),
                        },
                    }

        from elasticsearch6 import helpers
        helpers.bulk(es, gendata(), request_timeout=60)