in src/cfnlint/rules/resources/RetentionPeriodOnResourceTypesWithAutoExpiringContent.py [0:0]
def match(self, cfn):
"""Check for RetentionPeriod"""
matches = []
retention_attributes_by_resource_type = {
'AWS::Kinesis::Stream': [
{
'Attribute': 'RetentionPeriodHours',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-stream.html#cfn-kinesis-stream-retentionperiodhours'
}
],
'AWS::SQS::Queue': [
{
'Attribute': 'MessageRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-sqs-queues.html#aws-sqs-queue-msgretentionperiod'
}
],
'AWS::DocDB::DBCluster': [
{
'Attribute': 'BackupRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-docdb-dbcluster.html#cfn-docdb-dbcluster-backupretentionperiod'
}
],
'AWS::Synthetics::Canary': [
{
'Attribute': 'SuccessRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-synthetics-canary.html#cfn-synthetics-canary-successretentionperiod'
},
{
'Attribute': 'FailureRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-synthetics-canary.html#cfn-synthetics-canary-failureretentionperiod'
}
],
'AWS::Redshift::Cluster': [
{
'Attribute': 'AutomatedSnapshotRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-redshift-cluster.html#cfn-redshift-cluster-automatedsnapshotretentionperiod'
}
],
'AWS::RDS::DBInstance': [
{
'Attribute': 'BackupRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-rds-database-instance.html#cfn-rds-dbinstance-backupretentionperiod'
}
],
'AWS::RDS::DBCluster': [
{
'Attribute': 'BackupRetentionPeriod',
'SourceUrl': 'http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-rds-dbcluster.html#cfn-rds-dbcluster-backuprententionperiod'
}
]
}
resources = cfn.get_resources()
for r_name, r_values in resources.items():
if r_values.get('Type') in retention_attributes_by_resource_type:
for attr_def in retention_attributes_by_resource_type[r_values.get('Type')]:
property_sets = r_values.get_safe('Properties')
for property_set, path in property_sets:
error_path = ['Resources', r_name] + path
if not property_set:
message = 'The default retention period will delete the data after a pre-defined time. Set an explicit values to avoid data loss on resource : %s' % '/'.join(
str(x) for x in error_path)
matches.append(RuleMatch(error_path, message))
else:
value = property_set.get(attr_def.get('Attribute'))
if not value:
message = 'The default retention period will delete the data after a pre-defined time. Set an explicit values to avoid data loss on resource : %s' % '/'.join(
str(x) for x in error_path)
matches.append(RuleMatch(error_path, message))
if isinstance(value, dict):
# pylint: disable=protected-access
refs = cfn._search_deep_keys(
'Ref', value, error_path + [attr_def.get('Attribute')])
for ref in refs:
if ref[-1] == 'AWS::NoValue':
message = 'The default retention period will delete the data after a pre-defined time. Set an explicit values to avoid data loss on resource : %s' % '/'.join(
str(x) for x in ref[0:-1])
matches.append(RuleMatch(ref[0:-1], message))
return matches