in src/qldb_streaming_to_es_sample/helpers/filtered_records_generator.py [0:0]
def filtered_records_generator(kinesis_deaggregate_records, table_names=None):
for record in kinesis_deaggregate_records:
# Kinesis data in Python Lambdas is base64 encoded
payload = base64.b64decode(record['kinesis']['data'])
# payload is the actual ion binary record published by QLDB to the stream
ion_record = ion.loads(payload)
print("Ion record: ", (ion.dumps(ion_record, binary=False)))
if ("recordType" in ion_record) and (ion_record["recordType"] == REVISION_DETAILS_RECORD_TYPE):
table_info = get_table_info_from_revision_record(ion_record)
if not table_names or (table_info and (table_info["tableName"] in table_names)):
revision_data, revision_metadata = get_data_metdata_from_revision_record(ion_record)
yield {"table_info": table_info,
"revision_data": revision_data,
"revision_metadata": revision_metadata}