def lambda_handler()

in src/qldb_streaming_to_es_sample/app.py [0:0]


def lambda_handler(event, context):
    """
    Triggered for a batch of kinesis records.
    Parses QLDB Journal streams and indexes documents to Elasticsearch for
    Person and Vehicle Registration Events.
    """
    raw_kinesis_records = event['Records']

    # Deaggregate all records in one call
    records = deaggregate_records(raw_kinesis_records)

    # Iterate through deaggregated records of Person and VehicleRegistration Table
    for record in filtered_records_generator(records,
                                             table_names=[Constants.PERSON_TABLENAME,
                                                          Constants.VEHICLE_REGISTRATION_TABLENAME]):
        table_name = record["table_info"]["tableName"]
        revision_data = record["revision_data"]
        revision_metadata = record["revision_metadata"]
        version = revision_metadata["version"]
        document = None

        if revision_data:
            # if record is for Person table and is an insert event
            if (table_name == Constants.PERSON_TABLENAME) and (version == 0) and \
                    __fields_are_present(Constants.PERSON_TABLE_FIELDS, revision_data):

                document = __create_document(Constants.PERSON_TABLE_FIELDS, revision_data)
                elasticsearch_client.index(index=TABLE_TO_INDEX_MAP[table_name],
                                           id=revision_metadata["id"], body=document, version=version)

            # if record is for VehicleRegistration table and is an insert or update event
            elif table_name == Constants.VEHICLE_REGISTRATION_TABLENAME and \
                    __fields_are_present(Constants.VEHICLE_REGISTRATION_TABLE_FIELDS, revision_data):
                document = __create_document(Constants.VEHICLE_REGISTRATION_TABLE_FIELDS, revision_data)
                elasticsearch_client.index(index=TABLE_TO_INDEX_MAP[table_name],
                                           id=revision_metadata["id"], body=document, version=version)

        else:
            # delete record
            elasticsearch_client.delete(index=TABLE_TO_INDEX_MAP[table_name],
                                           id=revision_metadata["id"], version=version)


    return {
        'statusCode': 200
    }