in IAM Access Denied Responder/generate-security-messages/index.py [0:0]
def publish_iam_user_history(event, context):
if 'Records' in event:
record = event['Records'][0]
snsMessage = json.loads(record['Sns']['Message'])['detail']
useridentity = snsMessage['userIdentity']
else:
useridentity = event['detail']['userIdentity']
snsMessage = event['detail']
if useridentity['type'] != "AssumedRole":
username = useridentity['userName'] # username
client = boto3.client('cloudtrail')
response = client.lookup_events(
LookupAttributes=[
{
'AttributeKey': 'ResourceName',
'AttributeValue': username
},
]
)
history = '\nHistory of IAM user:\n' if len(response['Events'])>0 else '\n No previous history reported for the user'
len_events = 0
for e in response['Events']:
cloudtrailEvent = json.loads(e['CloudTrailEvent'])
sourceIPAddress = cloudtrailEvent['sourceIPAddress']
history += '{0}, Action:m {1}, performed by {2}, from {3}\n'.format(
str(e['EventTime']), # date/time
e['EventName'], # action
e['Username'], # user/role name
cloudtrailEvent['sourceIPAddress'], # ip
cloudtrailEvent['userAgent'] # useragent
)
if 'APIKey' in os.environ and os.environ['APIKey']:
ip_geo_data = getIPGeoDetails(sourceIPAddress)
country = ip_geo_data['location']['country'] if 'location' in ip_geo_data else 'N/A'
region = ip_geo_data['location']['region'] if 'location' in ip_geo_data else 'N/A'
city = ip_geo_data['location']['city'] if 'location' in ip_geo_data else 'N/A'
ip_whois_data = getIPWhoisDetails(sourceIPAddress)
owner = ip_whois_data['WhoisRecord']['registrant']['organization'] if 'WhoisRecord' in ip_whois_data else 'N/A'
history +='This IP is located in {0}, {1}, {2} and is owned by {3}\n'.format(city, region, country, owner)
len_events += 1
if len_events >= 5:
break
client = boto3.client('sns')
client.publish(
TopicArn=os.environ['TopicTarget'],
Message=json.dumps({'TextMessage': history}),
)