in amazon_kinesis_data_analytics_for_apache_flink_snapshot_manager.py [0:0]
def track_snapshot_manager_status(dynamodb, ddb_table_name, primary_partition_key, primary_sort_key, app_name,
snapshot_manager_run_id, new_snapshot, snapshot_deletion_status):
"""
This function tracks the status of Snapshot Manager
:param dynamodb:
:param ddb_table_name:
:param primary_partition_key:
:param primary_sort_key:
:param app_name:
:param snapshot_manager_run_id:
:param new_snapshot:
:param snapshot_deletion_status:
:return:
"""
item_inserted = False
try:
# Prepare an item
item = {
primary_partition_key: {'S': app_name},
primary_sort_key: {'N': str(snapshot_manager_run_id)},
'new_snapshot_name': {'S': str(new_snapshot['SnapshotName'])},
'new_snapshot_create_time': {'S': str(new_snapshot['SnapshotCreationTimestamp'])},
'flink_app_version_id': {'S': str(new_snapshot['ApplicationVersionId'])}
}
if len(snapshot_deletion_status['deleted_snapshots']) > 0:
item['snapshots_deleted'] = {'S': str(snapshot_deletion_status['deleted_snapshots'])}
if len(snapshot_deletion_status['not_deleted_snapshots']) > 0:
item['snapshots_failed_to_be_deleted'] = {'S': str(snapshot_deletion_status['not_deleted_snapshots'])}
# Insert the item
put_item_response = dynamodb.put_item(TableName=ddb_table_name, Item=item)
if put_item_response['ResponseMetadata']['HTTPStatusCode'] == 200:
item_inserted = True
logger.info('An item inserted successfully')
except botocore.exceptions.ClientError as error:
if error.response['Error']['Code'] == 'ResourceNotFoundException':
logger.warning('The requested DynamoDB table was not found')
else:
print('Error Message: {}'.format(error.response['Error']['Message']))
return item_inserted