in aws_resource_scan.py [0:0]
def resources(self):
"""Transform the response into a dict of names to resource listings.
Args:
self: instance of the class.
Returns:
resource listing.
"""
if not self.input.response:
return self.input.response.copy()
response = self.input.response.copy()
complete = True
del response['ResponseMetadata']
complete = filter_aws.apply_filters(self.input, self.unfilter,
response, complete)
unfilter_list = self.unfilter
# Special handling for service-level kms keys; derived from alias name.
if ('kmsListKeys' not in unfilter_list and self.input.service == 'kms' and
self.input.operation == 'ListKeys'):
try:
aliases_file = '{}_{}_{}.json'.format(self.input.service,
'ListAliases', self.input.region)
aliases_file = self.directory + '/' + aliases_file
aliases_listing = RawListing.from_json(
json.load(open(aliases_file, 'rb')))
list_aliases = aliases_listing.response
service_key_ids = [
k.get('TargetKeyId') for k in list_aliases.get('Aliases', [])
if k.get('AliasName').lower().startswith('alias/aws')
]
response['Keys'] = [k for k in response.get('Keys', [])
if k.get('KeyId') not in service_key_ids]
except Exception as exc:
self.input.error = repr(exc)
# Filter default Internet Gateways
if ('ec2InternetGateways' not in unfilter_list and
self.input.service == 'ec2' and
self.input.operation == 'DescribeInternetGateways'):
try:
vpcs_file = '{}_{}_{}.json'.format(self.input.service,
'DescribeVpcs', self.input.region)
vpcs_file = self.directory + '/' + vpcs_file
vpcs_listing = RawListing.from_json(json.load(open(vpcs_file, 'rb')))
describe_vpcs = vpcs_listing.response
vpcs = {v['VpcId']: v for v in describe_vpcs.get('Vpcs', [])}
internet_gateways = []
for ig in response['InternetGateways']:
attachments = ig.get('Attachments', [])
# more than one, it cannot be default.
if len(attachments) != 1:
continue
vpc = attachments[0].get('VpcId')
if not vpcs.get(vpc, {}).get('IsDefault', False):
internet_gateways.append(ig)
response['InternetGateways'] = internet_gateways
except ValueError as exc:
self.input.error = repr(exc)
for key, value in response.items():
if not isinstance(value, list):
raise ValueError('No listing: {} is no list:'.format(key), response)
if not complete:
response['truncated'] = [True]
return response