in src/olympia/scanners/models.py [0:0]
def run_action(cls, version):
"""Try to find and execute an action for a given version, based on the
scanner results and associated rules.
If an action is found, it is run synchronously from this method, not in
a task.
"""
log.info('Checking rules and actions for version %s.', version.pk)
if version.addon.type != ADDON_EXTENSION:
log.info(
'Not running action(s) on version %s which belongs to a non-extension.',
version.pk,
)
return
try:
mad_result = cls.objects.filter(version=version, scanner=MAD).get()
customs = mad_result.results.get('scanners', {}).get('customs', {})
customs_score = customs.get('score', 0.5)
customs_models_agree = customs.get('result_details', {}).get(
'models_agree', True
)
if (
customs_score <= 0.01
or customs_score >= 0.99
or not customs_models_agree
):
log.info('Flagging version %s for human review by MAD.', version.pk)
_flag_for_human_review_by_scanner(
version=version, rule=None, scanner=MAD
)
except cls.DoesNotExist:
log.info('No MAD scanner result for version %s.', version.pk)
pass
result_query_name = cls._meta.get_field('matched_rules').related_query_name()
rule = (
cls.rule_model.objects.filter(
**{f'{result_query_name}__version': version, 'is_active': True}
)
.order_by(
# The `-` sign means descending order.
'-action'
)
.first()
)
if not rule:
log.info('No action to execute for version %s.', version.pk)
return
action_id = rule.action
action_name = ACTIONS.get(action_id, None)
if not action_name:
raise Exception('invalid action %s' % action_id)
ACTION_FUNCTIONS = {
NO_ACTION: _no_action,
FLAG_FOR_HUMAN_REVIEW: _flag_for_human_review,
DELAY_AUTO_APPROVAL: _delay_auto_approval,
DELAY_AUTO_APPROVAL_INDEFINITELY: _delay_auto_approval_indefinitely,
DELAY_AUTO_APPROVAL_INDEFINITELY_AND_RESTRICT: (
_delay_auto_approval_indefinitely_and_restrict
),
DELAY_AUTO_APPROVAL_INDEFINITELY_AND_RESTRICT_FUTURE_APPROVALS: (
_delay_auto_approval_indefinitely_and_restrict_future_approvals
),
}
action_function = ACTION_FUNCTIONS.get(action_id, None)
if not action_function:
raise Exception('no implementation for action %s' % action_id)
# We have a valid action to execute, so let's do it!
log.info('Starting action "%s" for version %s.', action_name, version.pk)
action_function(version=version, rule=rule)
log.info('Ending action "%s" for version %s.', action_name, version.pk)