in security_group_report/main.py [0:0]
def main():
table = []
columns = [
"Region",
"Instance Name",
"Instance-ID",
"SG-Name",
"SG-ID",
"Direction",
"Source",
"Destination",
"Protocol",
"Ports",
]
df = pd.DataFrame(table, columns=columns)
print("Collecting Security Groups information from every region....")
for region in regions:
ec2r = boto3.resource("ec2", region)
for instance in ec2r.instances.all():
inst_id = instance.id # get instance id
sgs = get_sgs(instance) # gets sg from instance
inst_name = get_name(instance) # gets the instance name
for sg in sgs:
sg_id = sg["GroupId"]
sg_name = sg["GroupName"]
rules_inbound = get_rules(sg_id, region)[0]
rules_outbound = get_rules(sg_id, region)[1]
for rule in rules_inbound:
rule_destination = inst_id
from_cidr = []
direction = "Inbound"
from_port_range = rule.get("FromPort", "any")
to_port_range = rule.get("ToPort", "any")
if from_port_range == to_port_range:
ports = from_port_range
else:
ports = str(from_port_range) + " - " + str(to_port_range)
if from_port_range == -1:
ports = "any"
protocol = rule["IpProtocol"]
if protocol == "-1":
protocol = "any"
for cidr in rule.get("IpRanges", []):
from_cidr.append(cidr["CidrIp"])
for cidrv6 in rule.get("Ipv6Ranges", []):
from_cidr.append(cidrv6["CidrIpv6"])
for source_sg in rule.get("UserIdGroupPairs", []):
from_cidr.append(source_sg["GroupId"])
for source_sg in rule.get("PrefixListIds", []):
from_cidr.append(source_sg["PrefixListId"])
if not from_cidr:
from_cidr.append("0.0.0.0/0")
df = df.append(
{
"Region": region,
"Instance Name": inst_name,
"Instance-ID": inst_id,
"SG-Name": sg_name,
"SG-ID": sg_id,
"Direction": direction,
"Source": from_cidr,
"Destination": rule_destination,
"Protocol": protocol,
"Ports": ports,
},
ignore_index=True,
)
for rule in rules_outbound:
rule_source = inst_id
to_cidr = []
direction = "Outbound"
protocol = rule["IpProtocol"]
from_port_range = rule.get("FromPort", "any")
to_port_range = rule.get("ToPort", "any")
if from_port_range == to_port_range:
ports = from_port_range
else:
ports = str(from_port_range) + " - " + str(to_port_range)
if from_port_range == -1:
ports = "any"
protocol = rule["IpProtocol"]
if protocol == "-1":
protocol = "any"
for cidr in rule.get("IpRanges", []):
to_cidr.append(cidr["CidrIp"])
for cidrv6 in rule.get("Ipv6Ranges", []):
to_cidr.append(cidrv6["CidrIpv6"])
for source_sg in rule.get("UserIdGroupPairs", []):
to_cidr.append(source_sg["GroupId"])
for source_sg in rule.get("PrefixListIds", []):
to_cidr.append(source_sg["PrefixListId"])
if not to_cidr:
to_cidr.append("0.0.0.0/0")
df = df.append(
{
"Region": region,
"Instance Name": inst_name,
"Instance-ID": inst_id,
"SG-Name": sg_name,
"SG-ID": sg_id,
"Direction": direction,
"Source": rule_source,
"Destination": to_cidr,
"Protocol": protocol,
"Ports": ports,
},
ignore_index=True,
)
time = datetime.datetime.now().strftime("%H-%M-%S_%d-%m-%Y")
file_name = "fw_policy-report-" + time + ".xlsx"
print(file_name + " has been created")
return df.to_excel(file_name)