in python/IAM_USER_USED_LAST_90_DAYS/IAM_USER_USED_LAST_90_DAYS.py [0:0]
def lambda_handler(event, context):
global AWS_CONFIG_CLIENT
try:
if ASSUME_ROLE_MODE:
AWS_CONFIG_CLIENT = get_client('config', event)
except Exception as ex:
return build_error_response('Encountered error while making API request',
str(ex),
ex.response['Error']['Code'],
ex.response['Error']['Message']
)
# liblogging.logEvent(event)
check_defined(event, 'event')
invoking_event = json.loads(event['invokingEvent'])
rule_parameters = {}
if 'ruleParameters' in event:
rule_parameters = json.loads(event['ruleParameters'])
expiration_days = 90 # default
if 'NotUsedTimeOutInDays' in rule_parameters:
try:
expiration_days = int(str(rule_parameters['NotUsedTimeOutInDays']))
except ValueError as ex:
return build_invalid_integer_error_response(ex)
if expiration_days < 0 or expiration_days > 999999999: # max value supported by time delta function
return build_unsupported_expiration_days_error_response()
new_user_cool_down = 7 # default
if 'NewUserCooldownInDays' in rule_parameters:
try:
new_user_cool_down = int(str(rule_parameters['NewUserCooldownInDays']))
except ValueError as ex:
return build_invalid_integer_error_response(ex)
whilelist_list = [] # default
if 'WhitelistedUserList' in rule_parameters:
try:
whilelist_list = rule_parameters['WhitelistedUserList'].replace(', ',',').split(',')
validate_whitelist(whilelist_list)
except AttributeError as ex:
return build_invalid_str_error_response(ex)
except ValueError as ex:
return build_invalid_str_error_response(ex)
rule_parameters = {
'NotUsedTimeOutInDays': expiration_days,
'WhitelistedUserList': whilelist_list,
'NewUserCooldownInDays': new_user_cool_down
}
try:
configuration_item = get_configuration_item(invoking_event)
if invoking_event['messageType'] == 'ConfigurationItemChangeNotification':
# liblogging.logCIMetadata(event)
compliance_result = evaluate_changetrigger_compliance(event, configuration_item, rule_parameters)
elif invoking_event['messageType'] == 'ScheduledNotification':
compliance_result = evaluate_scheduled_compliance(event, configuration_item, rule_parameters)
else:
return {'internalErrorMessage': 'Unexpected message type ' + str(invoking_event)}
except botocore.exceptions.ClientError as ex:
if is_internal_error(ex):
return build_internal_error_response("Unexpected error while completing API request", str(ex))
else:
return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'],
ex.response['Error']['Message'])
except ValueError as ex:
return build_internal_error_response(str(ex), str(ex))
evaluations = []
latest_evaluations = []
if isinstance(compliance_result, str):
evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result))
elif isinstance(compliance_result, list):
for evaluation in compliance_result:
missing_fields = False
for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
if field not in evaluation:
print("Missing " + field + " from custom evaluation.")
missing_fields = True
if not missing_fields:
latest_evaluations.append(evaluation)
evaluations = clean_up_old_evaluations(latest_evaluations, event)
elif isinstance(compliance_result, dict):
missing_fields = False
for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
if field not in compliance_result:
print("Missing " + field + " from custom evaluation.")
missing_fields = True
if not missing_fields:
evaluations.append(compliance_result)
else:
evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE'))
# Put together the request that reports the evaluation status
resultToken = event['resultToken']
testMode = False
if resultToken == 'TESTMODE':
# Used solely for RDK test to skip actual put_evaluation API call
testMode = True
# Invoke the Config API to report the result of the evaluation
evaluation_copy = []
evaluation_copy = evaluations[:]
while(evaluation_copy):
AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=resultToken, TestMode=testMode)
del evaluation_copy[:100]
# Used solely for RDK test to be able to test Lambda function
return evaluations