in amazon_kinesis_data_analytics_for_apache_flink_snapshot_manager.py [0:0]
def take_app_snapshot(kin_analytics, flink_app_name, snapshot_name):
"""
This function takes a Flink snapshot
:param kin_analytics:
:param flink_app_name:
:param snapshot_name:
:return:
"""
snapshot_creation_resp = {
"app_name": flink_app_name,
"snapshot_name": "",
"is_initiated": False,
"error_message": "",
"app_version": ""
}
try:
res = kin_analytics.create_application_snapshot(ApplicationName=flink_app_name, SnapshotName=snapshot_name)
if res['ResponseMetadata']['HTTPStatusCode'] == 200:
snapshot_creation_resp['is_initiated'] = True
snapshot_creation_resp['snapshot_name'] = snapshot_name
logger.info('Snapshot creation initiated.')
except botocore.exceptions.ClientError as error:
snapshot_creation_resp['error_message'] = error.response['Error']['Message']
if error.response['Error']['Code'] == 'ResourceNotFoundException':
logger.warning('The requested Kinesis Data Analytics Flink Application was not found')
elif error.response['Error']['Code'] == 'InvalidRequestException':
print('Error Message: {}'.format(error.response['Error']['Message']))
else:
print('Error Message: {}'.format(error.response['Error']['Message']))
return snapshot_creation_resp