in lambdas/stream_station_status_to_es/index.py [0:0]
def lambda_handler(event, context):
delete_count = 0
update_count = 0
for record in event['Records']:
# Get the primary key of the station
station_id = record['dynamodb']['Keys']['station_id']['N']
# If deleted form DynamoDB, remove from ES
if record['eventName'] == 'REMOVE':
r = requests.delete('{}{}'.format(es_url, station_id), auth = awsauth)
delete_count += 1
# Update record so we don't overwrite station details
else:
# Deserialize the image
new_image = record['dynamodb']['NewImage']
deserializer = TypeDeserializer()
deserialized_image = json.dumps({k: deserializer.deserialize(v) for k, v in new_image.items()}, cls=DecimalEncoder)
payload = {
'doc': json.loads(deserialized_image),
'doc_as_upsert': True
}
# Insert or update to ES
r = requests.post('{}{}/_update'.format(es_url, station_id), auth = awsauth, headers = headers, json = payload)
update_count += 1
print('[DEBUG] Processed station_id: {}. Request response: {}'.format(station_id, r.text))
message = '[INFO] Deleted {} stations. Insert or updated {} stations.'.format(delete_count, update_count)
print(message)