in amazon_kinesis_data_analytics_for_apache_flink_snapshot_manager.py [0:0]
def list_flink_app_snapshots(kin_analytics, flink_app_name, app_ver_id):
"""
This function get a list of snapshots for a Kinesis Data Analytics Flink Application
:param kin_analytics:
:param flink_app_name:
:param app_ver_id:
:return:
"""
app_snapshots_latest_version = []
try:
response = kin_analytics.list_application_snapshots(ApplicationName=flink_app_name, Limit=10)
snapshot_summary_list = response['SnapshotSummaries']
for snapshot_summary in snapshot_summary_list:
if app_ver_id == snapshot_summary['ApplicationVersionId']:
app_snapshots_latest_version.append(snapshot_summary)
# process next set list of items if 'NextToken' exist in the response
while 'NextToken' in response:
response = kin_analytics.list_application_snapshots(
ApplicationName=flink_app_name, Limit=50, NextToken=response['NextToken']
)
snapshot_summary_list = response['SnapshotSummaries']
for snapshot_summary in snapshot_summary_list:
if app_ver_id == snapshot_summary['ApplicationVersionId']:
app_snapshots_latest_version.append(snapshot_summary)
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ResourceNotFoundException':
logger.warning('The requested Kinesis Data Analytics Flink Application was not found')
else:
print('Error Message: {}'.format(error.response['Error']['Message']))
return app_snapshots_latest_version