in src/qldb_streaming_to_es_sample/app.py [0:0]
def lambda_handler(event, context):
"""
Triggered for a batch of kinesis records.
Parses QLDB Journal streams and indexes documents to Elasticsearch for
Person and Vehicle Registration Events.
"""
raw_kinesis_records = event['Records']
# Deaggregate all records in one call
records = deaggregate_records(raw_kinesis_records)
# Iterate through deaggregated records of Person and VehicleRegistration Table
for record in filtered_records_generator(records,
table_names=[Constants.PERSON_TABLENAME,
Constants.VEHICLE_REGISTRATION_TABLENAME]):
table_name = record["table_info"]["tableName"]
revision_data = record["revision_data"]
revision_metadata = record["revision_metadata"]
version = revision_metadata["version"]
document = None
if revision_data:
# if record is for Person table and is an insert event
if (table_name == Constants.PERSON_TABLENAME) and (version == 0) and \
__fields_are_present(Constants.PERSON_TABLE_FIELDS, revision_data):
document = __create_document(Constants.PERSON_TABLE_FIELDS, revision_data)
elasticsearch_client.index(index=TABLE_TO_INDEX_MAP[table_name],
id=revision_metadata["id"], body=document, version=version)
# if record is for VehicleRegistration table and is an insert or update event
elif table_name == Constants.VEHICLE_REGISTRATION_TABLENAME and \
__fields_are_present(Constants.VEHICLE_REGISTRATION_TABLE_FIELDS, revision_data):
document = __create_document(Constants.VEHICLE_REGISTRATION_TABLE_FIELDS, revision_data)
elasticsearch_client.index(index=TABLE_TO_INDEX_MAP[table_name],
id=revision_metadata["id"], body=document, version=version)
else:
# delete record
elasticsearch_client.delete(index=TABLE_TO_INDEX_MAP[table_name],
id=revision_metadata["id"], version=version)
return {
'statusCode': 200
}