in source/consumer/lambda_handler.py [0:0]
def bulk_index(es_object, asset, index, data):
if len(data) == 0:
print("Data is empty. Skipping insert to Elasticsearch.")
return
es_index = "mie{index}".format(index=index).lower()
actions_to_send = []
# Elasticsearch will respond with an error like, "Request size exceeded 10485760 bytes"
# if the bulk insert exceeds a maximum payload size. To avoid that, we use a max payload
# size that is well below the "Maximum Size of HTTP Request Payloads" for the smallest AWS
# Elasticsearch instance type (10MB). See service limits here:
# https://docs.aws.amazon.com/elasticsearch-service/latest/developerguide/aes-limits.html
maxPayloadSize=8000000
for item in data:
item["AssetId"] = asset
action = json.dumps({"index": {"_index": es_index, "_type": "_doc"}})
doc = json.dumps(item)
if ((len('\n'.join(actions_to_send))+len(action)+len(doc)) < maxPayloadSize):
actions_to_send.append(action)
actions_to_send.append(doc)
else:
# send and reset payload before appending the current item
actions = '\n'.join(actions_to_send)
print("bulk insert payload size: " + str(len(actions)))
try:
es_object.bulk(
index=es_index,
body=actions
)
except Exception as e:
print('Unable to load data into es:', e)
print("Data: ", item)
else:
print("Successfully stored data in elasticsearch for asset: ", asset)
# now reset the payload and append the current item
actions_to_send = []
actions_to_send.append(action)
actions_to_send.append(doc)
# finally send the last item
print("sending final bulk insert")
actions = '\n'.join(actions_to_send)
try:
es_object.bulk(
index=es_index,
body=actions
)
except Exception as e:
print('Unable to load data into es:', e)
print("Data: ", data)
else:
print("Successfully stored data in elasticsearch for asset: ", asset)