in rdk/template/runtime/python3.6-managed/managed-rule-code/rule_util.py [0:0]
def rule_handler(lambda_handler):
def handler_wrapper(event, context):
evaluations = []
print(event)
check_defined(event, 'event')
invokingEvent = json.loads(event['invokingEvent'])
ruleParameters = {}
if 'ruleParameters' in event:
ruleParameters = json.loads(event['ruleParameters'])
configurationItem = get_configuration_item(invokingEvent)
if configurationItem is None:
compliance = lambda_handler(event, context)
if isinstance(compliance, list):
for evaluation in compliance:
missing_fields = False
for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'):
if field not in evaluation:
print("Missing " + field + " from custom evaluation.")
missing_fields = True
if not missing_fields:
evaluations.append(evaluation)
else:
return ("NOT_APPLICABLE")
else:
invokingEvent['configurationItem'] = configurationItem
event['invokingEvent'] = json.dumps(invokingEvent)
compliance = 'NOT_APPLICABLE'
if is_applicable(configurationItem, event):
# Invoke the compliance checking function.
compliance = lambda_handler(event, context)
evaluations = [{
'ComplianceResourceType': configurationItem['resourceType'],
'ComplianceResourceId': configurationItem['resourceId'],
'ComplianceType': compliance,
'OrderingTimestamp': configurationItem['configurationItemCaptureTime']
}]
# Put together the request that reports the evaluation status
resultToken = event['resultToken']
testMode = False
if resultToken == 'TESTMODE':
# Used solely for RDK test to skip actual put_evaluation API call
testMode = True
# Invoke the Config API to report the result of the evaluation
aws_config.put_evaluations(Evaluations=evaluations, ResultToken=resultToken, TestMode=testMode)
# Used solely for RDK test to be able to test Lambda function
return compliance
return handler_wrapper