in src/plugins/brokers/kibbleES.py [0:0]
def bulk(self):
""" Push pending JSON objects in the queue to ES"""
xjson = self.json_queue
js_arr = []
self.json_queue = []
for entry in xjson:
js = entry
doc = js
js['@version'] = 1
dbname = self.broker.config['elasticsearch']['database']
if self.broker.noTypes:
dbname += "_%s" % js['doctype']
js_arr.append({
'_op_type': 'update' if js.get('upsert') else 'index',
'_index': dbname,
'_type': '_doc',
'_id': js['id'],
'doc' if js.get('upsert') else '_source': doc,
'doc_as_upsert': True,
})
else:
js_arr.append({
'_op_type': 'update' if js.get('upsert') else 'index',
'_index': dbname,
'_type': js['doctype'],
'_id': js['id'],
'doc' if js.get('upsert') else '_source': doc,
'doc_as_upsert': True,
})
try:
elasticsearch.helpers.bulk(self.broker.oDB, js_arr)
except Exception as err:
pprint("Warning: Could not bulk insert: %s" % err)