def process_access_layers()

in Azure Firewall/Script - Migrate Checkpoint config to Azure Firewall Policy/chkp2azfw.py [0:0]


def process_access_layers(layer_list, ipgroups):
    global cnt_netrules_ip, cnt_netrules_fqdn, cnt_chkp_rules
    last_action = None
    for layer in layer_list:
        for rule in layer:
            # Check rule is a dictionary and contains a type key
            if isinstance(rule, dict) and 'type' in rule:
                if rule['type'] == 'access-rule':
                    cnt_chkp_rules += 1
                    # Rule Name and action
                    rule_name = rule['name']
                    rule_action = str(find_members(policy_objects, rule['action'], member_list=[])[0])
                    # If there is a change from deny to allow, or from allow to deny, or if this is the first rule, we need to create a rule collection
                    if rule_action != last_action:
                        rule_collection = {
                            'name': rc_net_name + '-' + rule_action + '-' + str(len(az_net_rcs)),
                            'action': rule_action,
                            'rules': []
                        }
                        # Append the rule collection to the list of rule collections and set last_action to the new value
                        az_net_rcs.append(rule_collection)
                        last_action = rule_action
                    # action/src/dst/svc object Members
                    rule_src_members = find_members(policy_objects, rule['source'], member_list=[], mode='ip')
                    rule_dst_members = find_members(policy_objects, rule['destination'], member_list=[], mode='ip')
                    rule_svc_members = find_members(policy_objects, rule['service'], member_list=[], mode='svc')
                    # Print
                    if len(rule_src_members) > 0 and len(rule_dst_members) > 0 and len(rule_svc_members) > 0:
                        # 'sourceServiceTags' and 'destinationServiceTags' are auxiliary fields, since the service tags go actually in the 'sourceAddresses' and 'destinationAddresses' fields
                        # The fields will be removed in the function append_rule
                        new_rule = {
                            'name': rule['name'] + '-' + str(rule['uid']),
                            'ruleType': 'NetworkRule',
                            'sourceAddresses': [],
                            'sourceIpGroups': [],
                            'destinationAddresses': [],
                            'destinationFqdns': [],
                            'destinationIpGroups': [],
                            'sourceServiceTags': [],
                            'destinationServiceTags': []
                        }
                        if not args.rule_id_to_name:
                            new_rule['name'] = rule['name']
                        if len(rule_src_members) == 1 and is_ipgroup(ipgroups, rule_src_members[0]):
                            new_rule['sourceIpGroups'].append(find_ipgroup(ipgroups, rule_src_members[0]))['name']
                        else:
                            for src in rule_src_members:
                                if src == 'any' or src == '*' or 'any' in src or src[0] == 'any':
                                    new_rule['sourceAddresses'] = [ '*' ]
                                elif is_ipv4(src):
                                    if src not in new_rule['sourceAddresses']:
                                        new_rule['sourceAddresses'].append(src)
                                # If not an IP address, it must be a service tag
                                elif src not in new_rule['sourceAddresses']:
                                    if src not in new_rule['sourceServiceTags']:
                                        new_rule['sourceServiceTags'].append(src)
                        if len(rule_dst_members) == 1 and is_ipgroup(ipgroups, rule_dst_members[0]):
                            new_rule['destinationIpGroups'].append(find_ipgroup(ipgroups, rule_dst_members[0]))['name']
                        else:
                            for dst in rule_dst_members:
                                if dst == 'any' or dst == '*' or 'any' in dst:
                                    cnt_netrules_ip += 1
                                    new_rule['destinationAddresses'] = [ '*' ]
                                elif is_fqdn(dst):
                                    cnt_netrules_fqdn += 1
                                    if dst not in new_rule['destinationFqdns']:
                                        cnt_netrules_fqdn += 1
                                        new_rule['destinationFqdns'].append(dst)
                                elif is_ipv4(dst):
                                    if dst not in new_rule['destinationAddresses']:
                                        cnt_netrules_ip += 1
                                        new_rule['destinationAddresses'].append(dst)
                                # If not an IP address or a domain name, it must be a service tag
                                else:
                                    if dst not in new_rule['destinationServiceTags']:
                                        new_rule['destinationServiceTags'].append(dst)
                        # Services are in an array of 2-tuples (protocol, port)
                        if 'any' in rule_svc_members:
                            new_rule['ipProtocols'] = ['Any']
                            new_rule['destinationPorts'] = [ '*' ]
                        else:
                            new_rule['ipProtocols'] = []
                            new_rule['destinationPorts'] = []
                            for svc in rule_svc_members:
                                protocol = svc[0]
                                port = svc[1]
                                if protocol == 'tcp' or protocol == 'udp':
                                    if protocol not in new_rule['ipProtocols']:
                                        new_rule['ipProtocols'].append(protocol)
                                    if port not in new_rule['destinationPorts']:
                                        # Checkpoint accepts the syntax >1024, but Azure does not
                                        if port[0] == '>':
                                            new_rule['destinationPorts'].append(str(int(port[1:]) + 1) + '-65535')
                                        else:
                                            new_rule['destinationPorts'].append(port)
                                elif protocol == 'icmp':
                                    if protocol not in new_rule['ipProtocols']:
                                        new_rule['ipProtocols'].append(protocol)
                                    new_rule['destinationPorts'] = [ '*' ]
                                elif protocol == 'any':
                                    new_rule['ipProtocols'] = ['Any']
                                    new_rule['destinationPorts'] = [ '*' ]
                                else:
                                    print('ERROR: Unknown service protocol', protocol, 'in rule', rule_name, file=sys.stderr)
                        # Add new rule to the latest rule collection (the one we are working on)
                        if args.remove_explicit_deny and rule_action == 'Drop' and new_rule['sourceAddresses'] == [ '*' ] and new_rule['destinationAddresses'] == [ '*' ] and new_rule['destinationPorts'] == [ '*' ] and new_rule['ipProtocols'] == ['Any']:
                            discarded_rules.append(rule['uid'])
                            if log_level >= 6:
                                print('INFO: Skipping rule "{0}" as it is an explicit catch-all deny rule'.format(rule_name), file=sys.stderr)
                        else:
                            az_net_rcs[-1]['rules'] = append_rule(new_rule, az_net_rcs[-1]['rules'])
                    # If one of the objects was empty, add to the discarded rules
                    else:
                        discarded_rules.append(rule['uid'])