def check_codepipeline_cfn_template()

in src/helper.py [0:0]


def check_codepipeline_cfn_template(template):
    """Checks CodePipeline (CloudFormation Template) against DynamoDB Rules database
    to ensure security / governance policies

    Args:
        template (str): AWS CloudFormation template local location

    Returns:
        :obj: Returns list of results from security scan
    """
    cp_found = False
    scan_stages = None
    results = list()
    logger.info("Checking CodePipeline Template for Compliance")

    # Get Rules from DynamoDB Table
    scan_results = scan_dynamodb(table=os.environ['DYNAMODB_TABLE'])
    logger.info(f"Parsing through rules {scan_results}")

    if scan_results.get('Count', 0) < 0:
        logger.warning(f"No Items found in DynamoDBTable:{os.environ['DYNAMODB_TABLE']}")
        results.append("NoItemsFound")
        return results

    yaml.SafeLoader.add_multi_constructor('!', lambda l, suffix, node: None)
    with open(template, 'r') as stream:
        _json = yaml.safe_load(stream)

    # Make sure current cfn file has CodePipeline in it
    for _key, _value in _json['Resources'].items():
        for __key, __value in _value.items():
            if __key == "Type" and __value == "AWS::CodePipeline::Pipeline":
                cp_found = True

            if cp_found and isinstance(__value, dict) and __key == 'Properties':
                scan_stages = __value

    # Scan each item in table
    if scan_stages:
        for x in scan_results['Items']:
            logger.info(f"Scanning Rule:{x['RuleNumber']}")
            if x['PatternType'] == 'All':
                results.append(compare_template_items(x, scan_stages))

    else:
        logger.warning("No CodePipeline Template found to Scan against.")
        results.append("NoPipelineStagesFound")

    logger.info(f"Scan Results:{results}")
    return results