in installer/resources/src/find_existing_resources.py [0:0]
def get_rules_for_security_group(self, sg_ids):
try:
rules = {}
for sg_id in sg_ids:
for page in self.ec2.get_paginator("describe_security_groups").paginate():
for sg in page['SecurityGroups']:
sg_rules = []
if sg['GroupId'] != sg_id:
continue
if 'IpPermissions' in sg.keys():
for permission in sg['IpPermissions']:
if 'FromPort' in permission.keys():
from_port = permission['FromPort']
to_port = permission['ToPort']
else:
# IpProtocol = -1 -> All Traffic
from_port = 0
to_port = 65535
approved_ips = []
if permission['IpRanges'].__len__() > 0:
for r in permission['IpRanges']:
if 'CidrIp' in r.keys():
approved_ips.append(r['CidrIp'])
if permission['UserIdGroupPairs'].__len__() > 0:
for g in permission['UserIdGroupPairs']:
if 'GroupId' in g.keys():
approved_ips.append(g['GroupId'])
sg_rules.append({'from_port': from_port,
'to_port': to_port,
'approved_ips': approved_ips,
'type': 'ingress'})
rules[sg_id] = sg_rules
if 'IpPermissionsEgress' in sg.keys():
for permission in sg['IpPermissionsEgress']:
if 'FromPort' in permission.keys():
from_port = permission['FromPort']
to_port = permission['ToPort']
else:
# IpProtocol = -1 -> All Traffic
from_port = 0
to_port = 65535
approved_ips = []
if permission['IpRanges'].__len__() > 0:
for r in permission['IpRanges']:
if 'CidrIp' in r.keys():
approved_ips.append(r['CidrIp'])
if permission['UserIdGroupPairs'].__len__() > 0:
for g in permission['UserIdGroupPairs']:
if 'GroupId' in g.keys():
approved_ips.append(g['GroupId'])
sg_rules.append({'from_port': from_port,
'to_port': to_port,
'approved_ips': approved_ips,
'type': 'egress'})
rules[sg_id] = sg_rules
return {'success': True,
'message': rules}
except Exception as err:
return {'success': False,
'message': str(err)}