def lambda_handler()

in amazon_kinesis_data_analytics_for_apache_flink_snapshot_manager.py [0:0]


def lambda_handler(event, context):
    """
    AWS Lambda function's handler function. It takes a snapshot of a Kinesis Data Analytics Flink application,
    retains the most recent X number of snapshots, and deletes the rest. For X, see parameter
    'num_of_older_snapshots_to_retain'. :param event: :param context: :return:
    """
    print('Running Snapshot Manager. Input event:', json.dumps(event, indent=4))
    # read environment variables
    region = os.environ['aws_region']
    flink_app_name = os.environ['app_name']
    ddb_table_name = os.environ['snapshot_manager_ddb_table_name']
    primary_partition_key_name = os.environ['primary_partition_key_name']
    primary_sort_key_name = os.environ['primary_sort_key_name']
    sns_topic_arn = os.environ['sns_topic_arn']
    num_of_older_snapshots_to_retain = int(os.environ['number_of_older_snapshots_to_retain'])
    snapshot_creation_wait_time_seconds = int(os.environ['snapshot_creation_wait_time_seconds'])

    # setup clients
    sns = boto3.client('sns', region)
    dynamodb = boto3.client('dynamodb', region)
    kinesis_analytics = boto3.client('kinesisanalyticsv2', region)

    # initialize variables
    deleted_snapshots = []
    not_deleted_snapshots = []
    snapshot_deletion_status = {
        "deleted_snapshots":  deleted_snapshots,
        "not_deleted_snapshots": not_deleted_snapshots
    }
    snapshot_manager_run_id = int(round(time.time() * 1000))
    snapshot_name = 'custom_' + str(snapshot_manager_run_id)
    response_body = {
        "app_name": flink_app_name,
        "app_version": "",
        "snapshot_manager_run_id": snapshot_manager_run_id,
        "new_snapshot_name": snapshot_name,
        "app_is_running": True,
        "app_is_healthy": True,
        "new_snapshot_initiated": False,
        "new_snapshot_completed": False,
        "new_snapshot_creation_delayed": False,
        "old_snapshots_to_be_deleted": False,
        "num_of_snapshot_deleted": 0,
        "num_of_snapshot_not_deleted": 0
    }
    print('Snapshot Manager Execution Status. Run Id: {0}'.format(snapshot_manager_run_id))

    # describe application to get application status and current version
    response = describe_flink_application(kinesis_analytics, flink_app_name)
    response_body['app_version'] = response['ApplicationDetail']['ApplicationVersionId']

    # If application is running then takes a snapshot
    if response['ApplicationDetail']['ApplicationStatus'] == 'RUNNING':
        snapshot_creation_res = take_app_snapshot(kinesis_analytics, flink_app_name, snapshot_name)
        if snapshot_creation_res['is_initiated']:
            response_body['new_snapshot_initiated'] = True
        else:
            response_body['app_is_healthy'] = False
    else:
        response_body['app_is_running'] = False
        error_message = 'A new snapshot cannot be taken. Flink application {0} is not running.'.format(flink_app_name)
        print(error_message)
        notify_error(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, error_message)

    # If application is not healthy then send a notification
    if not response_body['app_is_healthy']:
        error_message = 'A new snapshot cannot be taken now. Flink application {0} may not be healthy.'.format(
            flink_app_name)
        print(error_message)
        notify_error(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, error_message)

    # If new snapshot creation initiated then check if it is completed
    max_checks = 4
    checks_done = 0
    if response_body['new_snapshot_initiated']:
        while checks_done < max_checks:
            time.sleep(snapshot_creation_wait_time_seconds)
            snapshots = list_flink_app_snapshots(kinesis_analytics, flink_app_name, response_body['app_version'])
            print('Application {0} of version {1} has {2} snapshots: '.format(flink_app_name,
                                                                              response_body['app_version'],
                                                                              len(snapshots)))
            sorted_snapshots = sorted(snapshots, key=lambda k: k['SnapshotCreationTimestamp'], reverse=True)
            latest_snapshot = sorted_snapshots[0]
            if latest_snapshot['SnapshotName'] == snapshot_creation_res['snapshot_name']:
                if latest_snapshot['SnapshotStatus'] == 'READY':
                    checks_done = 4
                    response_body['new_snapshot_completed'] = True
                    print(response_body)
                    send_sns_notification(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, snapshot_name,
                                          latest_snapshot, True)
                else:
                    checks_done += 1
            else:
                print('No snapshot found with the name: {0}'.format(snapshot_creation_res['snapshot_name']))

    if checks_done == 4 and not response_body['new_snapshot_completed']:
        print("Snapshot creation has been delayed")
        response_body['new_snapshot_creation_delayed'] = True

    # If newly initiated snapshot is not completed on time then send a notification
    if response_body['new_snapshot_creation_delayed']:
        send_sns_notification(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, snapshot_name, None, False)
    
    if response_body['new_snapshot_completed']:
        num_of_snapshots_after_new_snapshot = list_flink_app_snapshots(kinesis_analytics, flink_app_name,
                                                                       response['ApplicationDetail'][
                                                                           'ApplicationVersionId'])
        # check if the number of old snapshots exceeds the threshold.
        if len(num_of_snapshots_after_new_snapshot) > num_of_older_snapshots_to_retain:
            response_body['old_snapshots_to_be_deleted'] = True

    # initiate old snapshot deletion process
    if response_body['old_snapshots_to_be_deleted']:
        sorted_snapshots_after_new_snapshots = sorted(num_of_snapshots_after_new_snapshot,
                                                      key=lambda k: k['SnapshotCreationTimestamp'], reverse=True)
        snapshots_to_be_deleted = sorted_snapshots_after_new_snapshots[num_of_older_snapshots_to_retain:None]
        for snapshot_to_be_deleted in snapshots_to_be_deleted:
            snapshot_deleted = delete_snapshot(kinesis_analytics, flink_app_name, snapshot_to_be_deleted)
            if snapshot_deleted:
                deleted_snapshots.append(snapshot_to_be_deleted)
                print(
                    'Snapshot deleted: {0}, name: {1}'.format(snapshot_deleted,
                                                              snapshot_to_be_deleted['SnapshotName']))
            else:
                not_deleted_snapshots.append(snapshot_to_be_deleted)
        # add deleted and not-deleted snapshots to snapshot_deletion_status dictionary
        snapshot_deletion_status['deleted_snapshots'] = deleted_snapshots
        snapshot_deletion_status['not_deleted_snapshots'] = not_deleted_snapshots
        response_body['num_of_snapshot_deleted'] = len(deleted_snapshots)
        response_body['num_of_snapshot_not_deleted'] = len(not_deleted_snapshots)
    else:
        logger.info('Number of historical snapshots less than the threshold. No need to delete any snapshots.')

    # Tracking and Notifications
    if response_body['new_snapshot_completed']:
        track_snapshot_manager_status(dynamodb, ddb_table_name, primary_partition_key_name, primary_sort_key_name,
                                      flink_app_name, snapshot_manager_run_id, latest_snapshot,
                                      snapshot_deletion_status)

    return_response = {'statusCode': 200, 'body': json.dumps(response_body)}
    return return_response