def bulk()

in src/plugins/brokers/kibbleES.py [0:0]


    def bulk(self):
        """ Push pending JSON objects in the queue to ES"""
        xjson = self.json_queue
        js_arr = []
        self.json_queue = []
        for entry in xjson:
            js = entry
            doc = js
            js['@version'] = 1
            dbname = self.broker.config['elasticsearch']['database']
            if self.broker.noTypes:
                dbname += "_%s" % js['doctype']
                js_arr.append({
                    '_op_type': 'update' if js.get('upsert') else 'index',
                    '_index': dbname,
                    '_type': '_doc',
                    '_id': js['id'],
                    'doc' if js.get('upsert') else '_source': doc,
                    'doc_as_upsert': True,
                })
            else:
                js_arr.append({
                    '_op_type': 'update' if js.get('upsert') else 'index',
                    '_index': dbname,
                    '_type': js['doctype'],
                    '_id': js['id'],
                    'doc' if js.get('upsert') else '_source': doc,
                    'doc_as_upsert': True,
                })
        try:
            elasticsearch.helpers.bulk(self.broker.oDB, js_arr)
        except Exception as err:
            pprint("Warning: Could not bulk insert: %s" % err)