def bulk_index()

in source/consumer/lambda_handler.py [0:0]


def bulk_index(es_object, asset, index, data):
    if len(data) == 0:
        print("Data is empty. Skipping insert to Elasticsearch.")
        return
    es_index = "mie{index}".format(index=index).lower()
    actions_to_send = []
    # Elasticsearch will respond with an error like, "Request size exceeded 10485760 bytes"
    # if the bulk insert exceeds a maximum payload size. To avoid that, we use a max payload
    # size that is well below the "Maximum Size of HTTP Request Payloads" for the smallest AWS
    # Elasticsearch instance type (10MB). See service limits here:
    # https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/aes-limits.html
    maxPayloadSize=8000000
    for item in data:
        item["AssetId"] = asset
        action = json.dumps({"index": {"_index": es_index, "_type": "_doc"}})
        doc = json.dumps(item)
        if ((len('\n'.join(actions_to_send))+len(action)+len(doc)) < maxPayloadSize):
            actions_to_send.append(action)
            actions_to_send.append(doc)
        else:
            # send and reset payload before appending the current item
            actions = '\n'.join(actions_to_send)
            print("bulk insert payload size: " + str(len(actions)))
            try:
                es_object.bulk(
                    index=es_index,
                    body=actions
                )
            except Exception as e:
                print('Unable to load data into es:', e)
                print("Data: ", item)
            else:
                print("Successfully stored data in elasticsearch for asset: ", asset)
            # now reset the payload and append the current item
            actions_to_send = []
            actions_to_send.append(action)
            actions_to_send.append(doc)
    # finally send the last item
    print("sending final bulk insert")
    actions = '\n'.join(actions_to_send)
    try:
        es_object.bulk(
            index=es_index,
            body=actions
        )
    except Exception as e:
        print('Unable to load data into es:', e)
        print("Data: ", data)
    else:
        print("Successfully stored data in elasticsearch for asset: ", asset)