in rdklib/evaluator.py [0:0]
def handle(self, event, context):
check_defined(event, 'event')
client_factory = ClientFactory(self.__rdk_rule.get_execution_role_arn(event))
invoking_event = init_event(event, client_factory)
rule_parameters = {}
if 'ruleParameters' in event:
rule_parameters = json.loads(event['ruleParameters'])
try:
valid_rule_parameters = self.__rdk_rule.evaluate_parameters(rule_parameters)
except InvalidParametersError as ex:
return build_parameters_value_error_response(ex)
try:
if invoking_event['messageType'] == 'ScheduledNotification':
compliance_result = self.__rdk_rule.evaluate_periodic(event, client_factory, valid_rule_parameters)
return process_periodic_evaluations_list(event, client_factory, compliance_result, self.__rdk_rule)
if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'OversizedConfigurationItemChangeNotification']:
if not self.__expected_resource_types:
raise Exception("Change triggered rules must provide expected resource types")
configuration_item = get_configuration_item(invoking_event)
if is_applicable_status(configuration_item, event) and is_applicable_resource_type(configuration_item, self.__expected_resource_types):
compliance_result = self.__rdk_rule.evaluate_change(event, client_factory, configuration_item, valid_rule_parameters)
else:
compliance_result = [Evaluation(ComplianceType.NOT_APPLICABLE)]
return process_event_evaluations_list(event, client_factory, compliance_result, configuration_item)
return build_internal_error_response('Unexpected message type', str(invoking_event))
except botocore.exceptions.ClientError as ex:
error_code = ex.response['Error']['Code']
if 'AccessDenied' in error_code or 'UnauthorizedOperation' in error_code:
return build_error_response(
"Insufficient access to perform this action.", str(ex),
ex.response['Error']['Code'],
ex.response['Error']['Message'])
if is_internal_error(ex):
return build_internal_error_response("Unexpected error while completing API request", str(ex))
return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message'])
except ValueError as ex:
return build_internal_error_response(str(ex), str(ex))