def get_own_snapshots_source()

in lambda/snapshots_tool_utils.py [0:0]


def get_own_snapshots_source(pattern, response, backup_interval=None):
# Filters our own snapshots
    filtered = {}

    for snapshot in response['DBSnapshots']:
        
        # No need to consider snapshots that are still in progress
        if 'SnapshotCreateTime' not in snapshot:
            continue

        # No need to get tags for snapshots outside of the backup interval
        if backup_interval and snapshot['SnapshotCreateTime'].replace(tzinfo=None) < datetime.utcnow().replace(tzinfo=None) - timedelta(hours=backup_interval):
            continue

        if snapshot['SnapshotType'] == 'manual' and re.search(pattern, snapshot['DBInstanceIdentifier']) and snapshot['Engine'] in _SUPPORTED_ENGINES:
            client = boto3.client('rds', region_name=_REGION)
            response_tags = client.list_tags_for_resource(
                ResourceName=snapshot['DBSnapshotArn'])

            if search_tag_created(response_tags):
                filtered[snapshot['DBSnapshotIdentifier']] = {
                    'Arn': snapshot['DBSnapshotArn'], 'Status': snapshot['Status'], 'DBInstanceIdentifier': snapshot['DBInstanceIdentifier']}

        elif snapshot['SnapshotType'] == 'manual' and (pattern == 'ALL_CLUSTERS' or pattern == 'ALL_SNAPSHOTS' or pattern == 'ALL_INSTANCES') and snapshot['Engine'] in _SUPPORTED_ENGINES:
            client = boto3.client('rds', region_name=_REGION)
            response_tags = client.list_tags_for_resource(
                ResourceName=snapshot['DBSnapshotArn'])

            if search_tag_created(response_tags):
                filtered[snapshot['DBSnapshotIdentifier']] = {
                    'Arn': snapshot['DBSnapshotArn'], 'Status': snapshot['Status'], 'DBInstanceIdentifier': snapshot['DBInstanceIdentifier']}

    return filtered