in distant_supervision/ds_es_client.py [0:0]
def _index_rdd_partition(self, article_lst):
es_conf = self.es_conf
es = ElasticsearchMagic.get_instance('singleton', hosts=[es_conf.hosts])
def gendata():
for article in article_lst:
for sent_obj in article.sents:
text_body = sent_obj.text
yield {
'_op_type': 'create', # `create` will fail on duplicate _id
"_index": es_conf.index_name,
"_type": es_conf.doc_type,
'_id': sent_obj.id,
"_source": {
'body': text_body,
'body_with_title': '{} \n {}'.format(article.title, text_body),
'article_id': article.id,
'article_title': article.title,
'entities': json.dumps(sent_obj.ents),
'noun_chunks': json.dumps(sent_obj.noun_chunks),
},
}
from elasticsearch6 import helpers
helpers.bulk(es, gendata(), request_timeout=60)