in amazon_kinesis_data_analytics_for_apache_flink_snapshot_manager.py [0:0]
def lambda_handler(event, context):
"""
AWS Lambda function's handler function. It takes a snapshot of a Kinesis Data Analytics Flink application,
retains the most recent X number of snapshots, and deletes the rest. For X, see parameter
'num_of_older_snapshots_to_retain'. :param event: :param context: :return:
"""
print('Running Snapshot Manager. Input event:', json.dumps(event, indent=4))
# read environment variables
region = os.environ['aws_region']
flink_app_name = os.environ['app_name']
ddb_table_name = os.environ['snapshot_manager_ddb_table_name']
primary_partition_key_name = os.environ['primary_partition_key_name']
primary_sort_key_name = os.environ['primary_sort_key_name']
sns_topic_arn = os.environ['sns_topic_arn']
num_of_older_snapshots_to_retain = int(os.environ['number_of_older_snapshots_to_retain'])
snapshot_creation_wait_time_seconds = int(os.environ['snapshot_creation_wait_time_seconds'])
# setup clients
sns = boto3.client('sns', region)
dynamodb = boto3.client('dynamodb', region)
kinesis_analytics = boto3.client('kinesisanalyticsv2', region)
# initialize variables
deleted_snapshots = []
not_deleted_snapshots = []
snapshot_deletion_status = {
"deleted_snapshots": deleted_snapshots,
"not_deleted_snapshots": not_deleted_snapshots
}
snapshot_manager_run_id = int(round(time.time() * 1000))
snapshot_name = 'custom_' + str(snapshot_manager_run_id)
response_body = {
"app_name": flink_app_name,
"app_version": "",
"snapshot_manager_run_id": snapshot_manager_run_id,
"new_snapshot_name": snapshot_name,
"app_is_running": True,
"app_is_healthy": True,
"new_snapshot_initiated": False,
"new_snapshot_completed": False,
"new_snapshot_creation_delayed": False,
"old_snapshots_to_be_deleted": False,
"num_of_snapshot_deleted": 0,
"num_of_snapshot_not_deleted": 0
}
print('Snapshot Manager Execution Status. Run Id: {0}'.format(snapshot_manager_run_id))
# describe application to get application status and current version
response = describe_flink_application(kinesis_analytics, flink_app_name)
response_body['app_version'] = response['ApplicationDetail']['ApplicationVersionId']
# If application is running then takes a snapshot
if response['ApplicationDetail']['ApplicationStatus'] == 'RUNNING':
snapshot_creation_res = take_app_snapshot(kinesis_analytics, flink_app_name, snapshot_name)
if snapshot_creation_res['is_initiated']:
response_body['new_snapshot_initiated'] = True
else:
response_body['app_is_healthy'] = False
else:
response_body['app_is_running'] = False
error_message = 'A new snapshot cannot be taken. Flink application {0} is not running.'.format(flink_app_name)
print(error_message)
notify_error(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, error_message)
# If application is not healthy then send a notification
if not response_body['app_is_healthy']:
error_message = 'A new snapshot cannot be taken now. Flink application {0} may not be healthy.'.format(
flink_app_name)
print(error_message)
notify_error(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, error_message)
# If new snapshot creation initiated then check if it is completed
max_checks = 4
checks_done = 0
if response_body['new_snapshot_initiated']:
while checks_done < max_checks:
time.sleep(snapshot_creation_wait_time_seconds)
snapshots = list_flink_app_snapshots(kinesis_analytics, flink_app_name, response_body['app_version'])
print('Application {0} of version {1} has {2} snapshots: '.format(flink_app_name,
response_body['app_version'],
len(snapshots)))
sorted_snapshots = sorted(snapshots, key=lambda k: k['SnapshotCreationTimestamp'], reverse=True)
latest_snapshot = sorted_snapshots[0]
if latest_snapshot['SnapshotName'] == snapshot_creation_res['snapshot_name']:
if latest_snapshot['SnapshotStatus'] == 'READY':
checks_done = 4
response_body['new_snapshot_completed'] = True
print(response_body)
send_sns_notification(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, snapshot_name,
latest_snapshot, True)
else:
checks_done += 1
else:
print('No snapshot found with the name: {0}'.format(snapshot_creation_res['snapshot_name']))
if checks_done == 4 and not response_body['new_snapshot_completed']:
print("Snapshot creation has been delayed")
response_body['new_snapshot_creation_delayed'] = True
# If newly initiated snapshot is not completed on time then send a notification
if response_body['new_snapshot_creation_delayed']:
send_sns_notification(sns, sns_topic_arn, flink_app_name, snapshot_manager_run_id, snapshot_name, None, False)
if response_body['new_snapshot_completed']:
num_of_snapshots_after_new_snapshot = list_flink_app_snapshots(kinesis_analytics, flink_app_name,
response['ApplicationDetail'][
'ApplicationVersionId'])
# check if the number of old snapshots exceeds the threshold.
if len(num_of_snapshots_after_new_snapshot) > num_of_older_snapshots_to_retain:
response_body['old_snapshots_to_be_deleted'] = True
# initiate old snapshot deletion process
if response_body['old_snapshots_to_be_deleted']:
sorted_snapshots_after_new_snapshots = sorted(num_of_snapshots_after_new_snapshot,
key=lambda k: k['SnapshotCreationTimestamp'], reverse=True)
snapshots_to_be_deleted = sorted_snapshots_after_new_snapshots[num_of_older_snapshots_to_retain:None]
for snapshot_to_be_deleted in snapshots_to_be_deleted:
snapshot_deleted = delete_snapshot(kinesis_analytics, flink_app_name, snapshot_to_be_deleted)
if snapshot_deleted:
deleted_snapshots.append(snapshot_to_be_deleted)
print(
'Snapshot deleted: {0}, name: {1}'.format(snapshot_deleted,
snapshot_to_be_deleted['SnapshotName']))
else:
not_deleted_snapshots.append(snapshot_to_be_deleted)
# add deleted and not-deleted snapshots to snapshot_deletion_status dictionary
snapshot_deletion_status['deleted_snapshots'] = deleted_snapshots
snapshot_deletion_status['not_deleted_snapshots'] = not_deleted_snapshots
response_body['num_of_snapshot_deleted'] = len(deleted_snapshots)
response_body['num_of_snapshot_not_deleted'] = len(not_deleted_snapshots)
else:
logger.info('Number of historical snapshots less than the threshold. No need to delete any snapshots.')
# Tracking and Notifications
if response_body['new_snapshot_completed']:
track_snapshot_manager_status(dynamodb, ddb_table_name, primary_partition_key_name, primary_sort_key_name,
flink_app_name, snapshot_manager_run_id, latest_snapshot,
snapshot_deletion_status)
return_response = {'statusCode': 200, 'body': json.dumps(response_body)}
return return_response