in Azure Firewall/Script - Migrate Checkpoint config to Azure Firewall Policy/chkp2azfw.py [0:0]
def process_access_layers(layer_list, ipgroups):
global cnt_netrules_ip, cnt_netrules_fqdn, cnt_chkp_rules
last_action = None
for layer in layer_list:
for rule in layer:
# Check rule is a dictionary and contains a type key
if isinstance(rule, dict) and 'type' in rule:
if rule['type'] == 'access-rule':
cnt_chkp_rules += 1
# Rule Name and action
rule_name = rule['name']
rule_action = str(find_members(policy_objects, rule['action'], member_list=[])[0])
# If there is a change from deny to allow, or from allow to deny, or if this is the first rule, we need to create a rule collection
if rule_action != last_action:
rule_collection = {
'name': rc_net_name + '-' + rule_action + '-' + str(len(az_net_rcs)),
'action': rule_action,
'rules': []
}
# Append the rule collection to the list of rule collections and set last_action to the new value
az_net_rcs.append(rule_collection)
last_action = rule_action
# action/src/dst/svc object Members
rule_src_members = find_members(policy_objects, rule['source'], member_list=[], mode='ip')
rule_dst_members = find_members(policy_objects, rule['destination'], member_list=[], mode='ip')
rule_svc_members = find_members(policy_objects, rule['service'], member_list=[], mode='svc')
# Print
if len(rule_src_members) > 0 and len(rule_dst_members) > 0 and len(rule_svc_members) > 0:
# 'sourceServiceTags' and 'destinationServiceTags' are auxiliary fields, since the service tags go actually in the 'sourceAddresses' and 'destinationAddresses' fields
# The fields will be removed in the function append_rule
new_rule = {
'name': rule['name'] + '-' + str(rule['uid']),
'ruleType': 'NetworkRule',
'sourceAddresses': [],
'sourceIpGroups': [],
'destinationAddresses': [],
'destinationFqdns': [],
'destinationIpGroups': [],
'sourceServiceTags': [],
'destinationServiceTags': []
}
if not args.rule_id_to_name:
new_rule['name'] = rule['name']
if len(rule_src_members) == 1 and is_ipgroup(ipgroups, rule_src_members[0]):
new_rule['sourceIpGroups'].append(find_ipgroup(ipgroups, rule_src_members[0]))['name']
else:
for src in rule_src_members:
if src == 'any' or src == '*' or 'any' in src or src[0] == 'any':
new_rule['sourceAddresses'] = [ '*' ]
elif is_ipv4(src):
if src not in new_rule['sourceAddresses']:
new_rule['sourceAddresses'].append(src)
# If not an IP address, it must be a service tag
elif src not in new_rule['sourceAddresses']:
if src not in new_rule['sourceServiceTags']:
new_rule['sourceServiceTags'].append(src)
if len(rule_dst_members) == 1 and is_ipgroup(ipgroups, rule_dst_members[0]):
new_rule['destinationIpGroups'].append(find_ipgroup(ipgroups, rule_dst_members[0]))['name']
else:
for dst in rule_dst_members:
if dst == 'any' or dst == '*' or 'any' in dst:
cnt_netrules_ip += 1
new_rule['destinationAddresses'] = [ '*' ]
elif is_fqdn(dst):
cnt_netrules_fqdn += 1
if dst not in new_rule['destinationFqdns']:
cnt_netrules_fqdn += 1
new_rule['destinationFqdns'].append(dst)
elif is_ipv4(dst):
if dst not in new_rule['destinationAddresses']:
cnt_netrules_ip += 1
new_rule['destinationAddresses'].append(dst)
# If not an IP address or a domain name, it must be a service tag
else:
if dst not in new_rule['destinationServiceTags']:
new_rule['destinationServiceTags'].append(dst)
# Services are in an array of 2-tuples (protocol, port)
if 'any' in rule_svc_members:
new_rule['ipProtocols'] = ['Any']
new_rule['destinationPorts'] = [ '*' ]
else:
new_rule['ipProtocols'] = []
new_rule['destinationPorts'] = []
for svc in rule_svc_members:
protocol = svc[0]
port = svc[1]
if protocol == 'tcp' or protocol == 'udp':
if protocol not in new_rule['ipProtocols']:
new_rule['ipProtocols'].append(protocol)
if port not in new_rule['destinationPorts']:
# Checkpoint accepts the syntax >1024, but Azure does not
if port[0] == '>':
new_rule['destinationPorts'].append(str(int(port[1:]) + 1) + '-65535')
else:
new_rule['destinationPorts'].append(port)
elif protocol == 'icmp':
if protocol not in new_rule['ipProtocols']:
new_rule['ipProtocols'].append(protocol)
new_rule['destinationPorts'] = [ '*' ]
elif protocol == 'any':
new_rule['ipProtocols'] = ['Any']
new_rule['destinationPorts'] = [ '*' ]
else:
print('ERROR: Unknown service protocol', protocol, 'in rule', rule_name, file=sys.stderr)
# Add new rule to the latest rule collection (the one we are working on)
if args.remove_explicit_deny and rule_action == 'Drop' and new_rule['sourceAddresses'] == [ '*' ] and new_rule['destinationAddresses'] == [ '*' ] and new_rule['destinationPorts'] == [ '*' ] and new_rule['ipProtocols'] == ['Any']:
discarded_rules.append(rule['uid'])
if log_level >= 6:
print('INFO: Skipping rule "{0}" as it is an explicit catch-all deny rule'.format(rule_name), file=sys.stderr)
else:
az_net_rcs[-1]['rules'] = append_rule(new_rule, az_net_rcs[-1]['rules'])
# If one of the objects was empty, add to the discarded rules
else:
discarded_rules.append(rule['uid'])