lib/ansible/modules/cloud/alicloud/ali_security_group.py (373 lines of code) (raw):

#!/usr/bin/python # -*- coding: utf-8 -*- # Copyright (c) 2017-present Alibaba Group Holding Limited. He Guimin <heguimin36@163.com.com> # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see http://www.gnu.org/licenses/. from __future__ import (absolute_import, division, print_function) __metaclass__ = type ANSIBLE_METADATA = {'metadata_version': '1.1', 'status': ['preview'], 'supported_by': 'community'} DOCUMENTATION = ''' --- module: ali_security_group short_description: Manage Alibaba Cloud Security Group and its rules. description: - Create and Delete Security Group, Modify its description and add/remove rules. options: state: description: - Create, delete a security group default: 'present' choices: ['present', 'absent'] type: str name: description: - Name of the security group, which is a string of 2 to 128 Chinese or English characters. It must begin with an uppercase/lowercase letter or a Chinese character and can contain numerals, "_", "." or "-". It cannot begin with http:// or https://. This is used in combination with C(vpc_id) to determine if a Securty Group already exists. required: True aliases: ['group_name'] type: str description: description: - Description of the security group, which is a string of 2 to 256 characters. - It cannot begin with http:// or https://. type: str vpc_id: description: - ID of the VPC to create the group in. This is used in conjunction with the C(name) to ensure idempotence. type: str rules: description: - List of hash/dictionaries firewall inbound rules to enforce in this group (see example). If none are supplied, no inbound rules will be enabled. Each rule has several keys and refer to https://www.alibabacloud.com/help/doc-detail/25554.htm. Each key should be format as under_score. At present, the valid keys including "ip_protocol", "port_range", "source_port_range", "nic_type", "policy", "dest_cidr_ip", "source_cidr_ip", "source_group_id", "source_group_owner_account", "source_group_owner_id", "priority" and "description". type: list elements: dict rules_egress: description: - List of hash/dictionaries firewall outbound rules to enforce in this group (see example). If none are supplied, no outbound rules will be enabled. Each rule has several keys and refer to https://www.alibabacloud.com/help/doc-detail/25560.htm. Each key should be format as under_score. At present, the valid keys including "ip_protocol", "port_range", "source_port_range", "nic_type", "policy", "dest_cidr_ip", "source_cidr_ip", "dest_group_id", "dest_group_owner_account", "dest_group_owner_id", "priority" and "description". type: list elements: dict purge_rules: description: - Purge existing rules on security group that are not found in rules default: True type: bool purge_rules_egress: description: - Purge existing rules_egress on security group that are not found in rules_egress default: True type: bool security_group_id: description: - Security group ID. aliases: ['id'] type: str tags: description: - A hash/dictionaries of security group tags. C({"key":"value"}) aliases: ["group_tags"] type: dict multi_ok: description: - By default the module will not create another Security Group if there is another Security Group with the same I(name) or I(vpc_id). Specify this as true if you want duplicate Security Groups created. There will be conflict when I(multi_ok=True) and I(recent=True). default: False type: bool recent: description: - By default the module will not choose the recent one if there is another Security Group with the same I(name) or I(vpc_id). Specify this as true if you want to target the recent Security Group. There will be conflict when I(multi_ok=True) and I(recent=True). default: False type: bool requirements: - "python >= 3.6" - "footmark >= 1.13.0" extends_documentation_fragment: - alicloud author: - "He Guimin (@xiaozhu36)" ''' EXAMPLES = ''' # Note: These examples do not set authentication details, see the Alibaba Cloud Guide for details. - name: Create a new security group ali_security_group: name: 'AliyunSG' vpc_id: 'vpc-123csecd' - name: Authorize security group ali_security_group: name: 'AliyunSG' vpc_id: 'vpc-123csecd' rules: - ip_protocol: tcp port_range: 1/122 source_cidr_ip: '10.159.6.18/12' rules_egress: - ip_protocol: icmp port_range: -1/-1 source_cidr_ip: 10.0.0.0/10 dest_group_id: 'sg-ce33rdsfe' priority: 1 - name: Delete security grp ali_security_group: name: 'AliyunSG' vpc_id: 'vpc-123csecd' state: absent ''' RETURN = ''' group: description: Dictionary of security group values returned: always type: complex contains: description: description: The Security Group description. returned: always type: str sample: "my ansible group" group_name: description: Security group name sample: "my-ansible-group" type: str returned: always group_id: description: Security group id sample: sg-abcd1234 type: str returned: always id: description: Alias of "group_id". sample: sg-abcd1234 type: str returned: always inner_access_policy: description: Whether can access each other in one security group. sample: True type: bool returned: always tags: description: Tags associated with the security group type: dict sample: - Name: My Security Group From: Ansible type: dict returned: always returned: always vpc_id: description: ID of VPC to which the security group belongs sample: vpc-abcd1234 type: str returned: always permissions: description: Inbound rules associated with the security group. sample: - create_time: "2018-06-28T08:45:58Z" description: "None" dest_cidr_ip: "None" dest_group_id: "None" dest_group_name: "None" dest_group_owner_account: "None" direction: "ingress" ip_protocol: "TCP" nic_type: "intranet" policy: "Accept" port_range: "22/22" priority: 1 source_cidr_ip: "0.0.0.0/0" source_group_id: "None" source_group_name: "None" source_group_owner_account: "None" type: list returned: always permissions_egress: description: Outbound rules associated with the security group. sample: - create_time: "2018-06-28T08:45:59Z" description: "NOne" dest_cidr_ip: "192.168.0.54/32" dest_group_id: "None" dest_group_name: "None" dest_group_owner_account: "None" direction: "egress" ip_protocol: "TCP" nic_type: "intranet" policy: "Accept" port_range: "80/80" priority: 1 source_cidr_ip: "None" source_group_id: "None" source_group_name: "None" source_group_owner_account: "None" type: list returned: always ''' import time from ansible.module_utils.basic import AnsibleModule from ansible.module_utils.alicloud_ecs import ecs_argument_spec, ecs_connect try: from footmark.exception import ECSResponseError HAS_FOOTMARK = True except ImportError: HAS_FOOTMARK = False VALID_INGRESS_PARAMS = ["ip_protocol", "port_range", "source_port_range", "nic_type", "policy", "dest_cidr_ip", "source_cidr_ip", "priority", "description", "source_group_id", "source_group_owner_account", "source_group_owner_id"] VALID_EGRESS_PARAMS = ["ip_protocol", "port_range", "source_port_range", "nic_type", "policy", "dest_cidr_ip", "source_cidr_ip", "priority", "description", "dest_group_id", "dest_group_owner_account", "dest_group_owner_id"] def validate_group_rule_keys(module, rule, direction): if not isinstance(rule, dict): module.fail_json(msg='Invalid rule parameter type [{0}].'.format(type(rule))) VALID_PARAMS = VALID_INGRESS_PARAMS if direction == "egress": VALID_PARAMS = VALID_EGRESS_PARAMS for k in rule: if k not in VALID_PARAMS: module.fail_json(msg="Invalid rule parameter '{0}' for rule: {1}. Supported parametes include: {2}".format(k, rule, VALID_PARAMS)) if 'ip_protocol' not in rule: module.fail_json(msg='ip_protocol is required.') if 'port_range' not in rule: module.fail_json(msg='port_range is required.') # The system response will return upper protocol rule['ip_protocol'] = str(rule['ip_protocol']).upper() def purge_rules(module, group, existing_rule, rules, direction): if not isinstance(existing_rule, dict): module.fail_json(msg='Invalid existing rule type [{0}].'.format(type(existing_rule))) if not isinstance(rules, list): module.fail_json(msg='Invalid rules type [{0}]. The specified rules should be a list.'.format(type(rules))) VALID_PARAMS = VALID_INGRESS_PARAMS if direction == "egress": VALID_PARAMS = VALID_EGRESS_PARAMS # Find the rules which is not in the specified rules find = False for rule in rules: for key in VALID_PARAMS: if not rule.get(key): continue if existing_rule.get(key) != rule.get(key): find = False break find = True if find: break # If it is not found, there will not purge anythind if not find: return group.revoke(existing_rule, direction) return False def group_exists(conn, module, vpc_id, name, security_group_id, multi, recent): """Returns None or a security group object depending on the existence of a security group. When supplied with a vpc_id and Name, it will check them to determine if it is a match otherwise it will assume the Security Group does not exist and thus return None. """ if multi: return None matching_groups = [] filters = {} if vpc_id: filters['vpc_id'] = vpc_id if security_group_id: filters['security_group_id'] = security_group_id try: for g in conn.describe_security_groups(**filters): if name and g.security_group_name != name: continue matching_groups.append(g.get()) except Exception as e: module.fail_json(msg="Failed to describe Security Groups: {0}".format(e)) if len(matching_groups) == 1: return matching_groups[0] elif len(matching_groups) > 1: if recent: return matching_groups[-1] module.fail_json(msg='Currently there are {0} Security Groups that have the same name and ' 'vpc id you specified. If you would like to create anyway ' 'please pass True to the multi_ok param. Or, please pass True to the recent ' 'param to choose the recent one.'.format(len(matching_groups))) return None def main(): argument_spec = ecs_argument_spec() argument_spec.update(dict( state=dict(default='present', type='str', choices=['present', 'absent']), name=dict(type='str', required=True, aliases=['group_name']), description=dict(type='str'), vpc_id=dict(type='str'), security_group_id=dict(type='str', aliases=['id', 'group_id']), tags=dict(type='dict', aliases=['group_tags']), rules=dict(type='list', elements='dict'), rules_egress=dict(type='list', elements='dict'), purge_rules=dict(type='bool', default=True), purge_rules_egress=dict(type='bool', default=True), multi_ok=dict(type='bool', default=False), recent=dict(type='bool', default=False) )) module = AnsibleModule(argument_spec=argument_spec) if HAS_FOOTMARK is False: module.fail_json(msg='footmark is required for the module ali_security_group.') ecs = ecs_connect(module) state = module.params['state'] security_group_id = module.params['security_group_id'] group_name = module.params['name'] if str(group_name).startswith('http://') or str(group_name).startswith('https://'): module.fail_json(msg='Name can not start with http:// or https://') description = module.params['description'] if str(description).startswith('http://') or str(description).startswith('https://'): module.fail_json(msg='description can not start with http:// or https://') multi = module.params['multi_ok'] recent = module.params['recent'] if multi and recent: module.fail_json(msg='multi_ok and recent can not be True at the same time.') changed = False group = group_exists(ecs, module, module.params['vpc_id'], group_name, security_group_id, multi, recent) if state == 'absent': if not group: module.exit_json(changed=changed, group={}) try: module.exit_json(changed=group.delete(), group={}) except ECSResponseError as e: module.fail_json(msg="Deleting security group {0} is failed. Error: {1}".format(group.id, e)) if not group: try: params = module.params params['security_group_name'] = group_name params['client_token'] = "Ansible-Alicloud-%s-%s" % (hash(str(module.params)), str(time.time())) group = ecs.create_security_group(**params) changed = True except ECSResponseError as e: module.fail_json(changed=changed, msg='Creating a security group is failed. Error: {0}'.format(e)) if not description: description = group.description if group.modify(name=group_name, description=description): changed = True # validating rules if provided ingress_rules = module.params['rules'] if ingress_rules: direction = 'ingress' for rule in ingress_rules: validate_group_rule_keys(module, rule, direction) if module.params['purge_rules']: for existing in group.permissions: if existing['direction'] != direction: continue if purge_rules(module, group, existing, ingress_rules, direction): changed = True for rule in ingress_rules: if group.authorize(rule, direction): changed = True egress_rules = module.params['rules_egress'] if egress_rules: direction = 'egress' for rule in egress_rules: validate_group_rule_keys(module, rule, direction) if module.params['purge_rules_egress']: for existing in group.permissions: if existing['direction'] != direction: continue if purge_rules(module, group, existing, egress_rules, direction): changed = True for rule in egress_rules: if group.authorize(rule, direction): changed = True module.exit_json(changed=changed, group=group.get().read()) if __name__ == '__main__': main()