def filtered_records_generator()

in src/qldb_streaming_to_es_sample/helpers/filtered_records_generator.py [0:0]


def filtered_records_generator(kinesis_deaggregate_records, table_names=None):
    for record in kinesis_deaggregate_records:
        # Kinesis data in Python Lambdas is base64 encoded
        payload = base64.b64decode(record['kinesis']['data'])
        # payload is the actual ion binary record published by QLDB to the stream
        ion_record = ion.loads(payload)
        print("Ion record: ", (ion.dumps(ion_record, binary=False)))

        if ("recordType" in ion_record) and (ion_record["recordType"] == REVISION_DETAILS_RECORD_TYPE):
            table_info = get_table_info_from_revision_record(ion_record)

            if not table_names or (table_info and (table_info["tableName"] in table_names)):
                revision_data, revision_metadata = get_data_metdata_from_revision_record(ion_record)

                yield {"table_info": table_info,
                       "revision_data": revision_data,
                       "revision_metadata": revision_metadata}