in lib/ansible/modules/cloud/alicloud/_alicloud_security_group.py [0:0]
def validate_format_sg_rules(module, inbound_rules=None, outbound_rules=None):
"""
Validate and format security group for inbound and outbound rules
:param module: Ansible module object
:param inbound_rules: Inbound rules for authorization to validate and format
:param outbound_rules: Outbound rules for authorization to validate and format
:return:
"""
# aliases for rule
ip_protocol_aliases = ('ip_protocol', 'proto')
inbound_cidr_ip_aliases = ('source_cidr_ip', 'cidr_ip')
outbound_cidr_ip_aliases = ('dest_cidr_ip', 'cidr_ip')
inbound_group_id_aliases = ('source_group_id', 'group_id')
outbound_group_id_aliases = ('dest_group_id', 'group_id')
inbound_group_owner_aliases = ('source_group_owner_id', 'group_owner_id')
outbound_group_owner_aliases = ('dest_group_owner_id', 'group_owner_id')
cidr_ip_aliases = {
"inbound": inbound_cidr_ip_aliases,
"outbound": outbound_cidr_ip_aliases,
}
group_id_aliases = {
"inbound": inbound_group_id_aliases,
"outbound": outbound_group_id_aliases,
}
group_owner_aliases = {
"inbound": inbound_group_owner_aliases,
"outbound": outbound_group_owner_aliases,
}
COMMON_VALID_PARAMS = ('proto', 'ip_protocol', 'cidr_ip', 'group_id', 'group_owner_id',
'nic_type', 'policy', 'priority', 'port_range')
INBOUND_VALID_PARAMS = ('source_cidr_ip', 'source_group_id', 'source_group_owner_id')
OUTBOUND_VALID_PARAMS = ('dest_cidr_ip', 'dest_group_id', 'dest_group_owner_id')
rule_types = []
rule_choice = {
"inbound": inbound_rules,
"outbound": outbound_rules,
}
valid_params = {
"inbound": INBOUND_VALID_PARAMS,
"outbound": OUTBOUND_VALID_PARAMS,
}
if inbound_rules:
rule_types.append('inbound')
if outbound_rules:
rule_types.append('outbound')
for rule_type in rule_types:
rules = rule_choice.get(rule_type)
total_rules = 0
if rules:
total_rules = len(rules)
if total_rules != 0:
for rule in rules:
if not isinstance(rule, dict):
module.fail_json(msg='Invalid rule parameter type [%s].' % type(rule))
for k in rule:
if k not in COMMON_VALID_PARAMS and k not in valid_params.get(rule_type):
module.fail_json(msg='Invalid rule parameter \'{}\''.format(k))
ip_protocol = get_alias_value(rule, ip_protocol_aliases)
if ip_protocol is None:
module.fail_json(msg="Ip Protocol required for rule authorization")
port_range = get_alias_value(rule, ['port_range'])
if port_range is None:
module.fail_json(msg="Port range is required for rule authorization")
# verifying whether group_id is provided and cidr_ip is not, so nic_type should be set to intranet
cidr_ip = get_alias_value(rule, cidr_ip_aliases.get(rule_type))
if cidr_ip is None:
if get_alias_value(rule, group_id_aliases.get(rule_type)) is not None:
if 'nic_type' in rule:
if not rule['nic_type'] == "intranet":
module.fail_json(msg="In mutual security group authorization (namely, "
"GroupId is specified, while CidrIp is not specified), "
"you must specify the nic_type as intranet")
else:
module.fail_json(msg="In mutual security group authorization (namely, "
"GroupId is specified, while CidrIp is not specified), "
"you must specify the nic_type as intranet")
# format rules to return for authorization
formatted_rule = {}
formatted_rule['ip_protocol'] = ip_protocol
formatted_rule['port_range'] = port_range
if cidr_ip:
formatted_rule['cidr_ip'] = cidr_ip
group_id = get_alias_value(rule, group_id_aliases.get(rule_type))
if group_id:
formatted_rule['group_id'] = group_id
group_owner_id = get_alias_value(rule, group_owner_aliases.get(rule_type))
if group_owner_id:
formatted_rule['group_owner_id'] = group_owner_id
if 'nic_type' in rule:
if rule['nic_type']:
formatted_rule['nic_type'] = rule['nic_type']
if 'policy' in rule:
if rule['policy']:
formatted_rule['policy'] = rule['policy']
if 'priority' in rule:
if rule['priority']:
formatted_rule['priority'] = rule['priority']
rule.clear()
rule.update(formatted_rule)