in qldb_streaming_sample/app.py [0:0]
def lambda_handler(event, context):
"""
Triggered for a batch of kinesis records.
Parses QLDB Journal streams and sends an SNS notification for Person and Vehicle Registration Events.
"""
sns_topic_arn = os.environ['SNS_ARN']
raw_kinesis_records = event['Records']
# Deaggregate all records in one call
records = deaggregate_records(raw_kinesis_records)
# Iterate through deaggregated records
for record in records:
# Kinesis data in Python Lambdas is base64 encoded
payload = base64.b64decode(record['kinesis']['data'])
# payload is the actual ion binary record published by QLDB to the stream
ion_record = ion.loads(payload)
print("Ion reocord: ", (ion.dumps(ion_record, binary=False)))
if (("recordType" in ion_record) and (ion_record["recordType"] == REVISION_DETAILS_RECORD_TYPE)):
revision_data, revision_metadata = get_data_metdata_from_revision_record(ion_record)
table_info = get_table_info_from_revision_record(ion_record)
if (revision_metadata["version"] == 0): # a new record inserted
if (table_info and table_info["tableName"] == PERSON_TABLENAME and person_data_has_required_fields(
revision_data)):
send_sns_notification(sns_topic_arn,
'New User Registered. Name: {first_name} {last_name}'
.format(first_name=revision_data["FirstName"],
last_name=revision_data["LastName"]))
elif (table_info and table_info[
"tableName"] == VEHICLE_REGISTRATION_TABLENAME and vehicle_registration_data_has_required_fields(
revision_data)):
send_sns_notification(sns_topic_arn, 'New Vehicle Registered. '
'VIN: {vin}, LicensePlateNumber: {license_plate_number}'
.format(vin=revision_data["VIN"],
license_plate_number=revision_data["LicensePlateNumber"]))
else:
print("No Action")
return {
'statusCode': 200
}