jbi/runner.py (260 lines of code) (raw):

""" Execute actions from Webhook requests """ import inspect import itertools import logging import re from typing import Optional from dockerflow.logging import request_id_context from statsd.defaults.env import statsd from jbi import ActionResult, Operation, jira from jbi import steps as steps_module from jbi.bugzilla import models as bugzilla_models from jbi.bugzilla.client import BugNotAccessibleError from jbi.bugzilla.service import get_service as get_bugzilla_service from jbi.environment import get_settings from jbi.errors import ActionNotFoundError, IgnoreInvalidRequestError from jbi.models import ( Action, ActionContext, ActionParams, Actions, ActionSteps, JiraContext, RunnerContext, ) from jbi.queue import DeadLetterQueue from jbi.steps import StepStatus logger = logging.getLogger(__name__) settings = get_settings() GROUP_TO_OPERATION = { "new": Operation.CREATE, "existing": Operation.UPDATE, "comment": Operation.COMMENT, "attachment": Operation.ATTACHMENT, } def groups2operation(steps: ActionSteps): """In the configuration files, the steps are grouped by `new`, `existing`, and `comment`. Internally, this correspond to enums of `Operation`. This helper remaps the list of steps. """ try: by_operation = { GROUP_TO_OPERATION[entry]: steps_list for entry, steps_list in steps.model_dump().items() } except KeyError as err: raise ValueError(f"Unsupported entry in `steps`: {err}") from err return by_operation def lookup_actions(bug: bugzilla_models.Bug, actions: Actions) -> list[Action]: """ Find matching actions from bug's whiteboard field. Tags are strings between brackets and can have prefixes/suffixes using dashes (eg. ``[project]``, ``[project-moco]``, ``[project-moco-sprint1]``). """ if bug.whiteboard: relevant_actions = [] for tag, action in actions.by_tag.items(): # [tag-word], [tag-], [tag], but not [word-tag] or [tagword] search_string = r"\[" + tag + r"(-[^\]]*)*\]" if re.search(search_string, bug.whiteboard, flags=re.IGNORECASE): relevant_actions.append(action) if len(relevant_actions): return relevant_actions raise ActionNotFoundError(", ".join(actions.by_tag.keys())) class Executor: """Callable class that runs step functions for an action.""" def __init__( self, parameters: ActionParams, bugzilla_service=None, jira_service=None ): self.parameters = parameters if not bugzilla_service: self.bugzilla_service = get_bugzilla_service() if not jira_service: self.jira_service = jira.get_service() self.steps = self._initialize_steps(parameters.steps) self.step_func_params = { "parameters": self.parameters, "bugzilla_service": self.bugzilla_service, "jira_service": self.jira_service, } def _initialize_steps(self, steps: ActionSteps): steps_by_operation = groups2operation(steps) steps_callables = { group: [getattr(steps_module, step_str) for step_str in steps_list] for group, steps_list in steps_by_operation.items() } return steps_callables def build_step_kwargs(self, func) -> dict: """Builds a dictionary of keyword arguments (kwargs) to be passed to the given `step` function. Args: func: The step function for which the kwargs are being built. Returns: A dictionary containing the kwargs that match the parameters of the function. """ function_params = inspect.signature(func).parameters return { key: value for key, value in self.step_func_params.items() if key in function_params.keys() } def __call__(self, context: ActionContext) -> ActionResult: """Called from `runner` when the action is used.""" has_produced_request = False for step in self.steps[context.operation]: context = context.update(current_step=step.__name__) step_kwargs = self.build_step_kwargs(step) try: result, context = step(context=context, **step_kwargs) if result == StepStatus.SUCCESS: statsd.incr(f"jbi.steps.{step.__name__}.count") elif result == StepStatus.INCOMPLETE: # Step did not execute all its operations. statsd.incr( f"jbi.action.{context.action.whiteboard_tag}.incomplete.count" ) except Exception: if has_produced_request: # Count the number of workflows that produced at least one request, # but could not complete entirely with success. statsd.incr( f"jbi.action.{context.action.whiteboard_tag}.aborted.count" ) raise step_responses = context.responses_by_step[step.__name__] if step_responses: has_produced_request = True for response in step_responses: logger.info( "Received %s", response, extra={ "response": response, **context.model_dump(), }, ) # Flatten the list of all received responses. responses = list( itertools.chain.from_iterable(context.responses_by_step.values()) ) return True, {"responses": responses} async def execute_or_queue( request: bugzilla_models.WebhookRequest, queue: DeadLetterQueue, actions: Actions ): request_id = request_id_context.get() if await queue.is_blocked(request): # If it's blocked, store it and wait for it to be processed later. await queue.postpone(request, rid=request_id) logger.info( "%r event on Bug %s was put in queue for later processing.", request.event.action, request.bug.id, extra={"payload": request.model_dump()}, ) return {"status": "skipped"} try: return execute_action(request, actions) except IgnoreInvalidRequestError as exc: return {"status": "invalid", "error": str(exc)} except Exception as exc: item = await queue.track_failed(request, exc, rid=request_id) logger.exception( "Failed to process %r event on Bug %s. %s was put in queue.", request.event.action, request.bug.id, item.identifier, extra={ "payload": request.model_dump(), "item": item.model_dump(), }, ) return {"status": "failed", "error": str(exc)} @statsd.timer("jbi.action.execution.timer") def execute_action( request: bugzilla_models.WebhookRequest, actions: Actions, ): """Execute the configured actions for the specified `request`. If multiple actions are configured for a given request, all of them are executed. This will raise an `IgnoreInvalidRequestError` error if the request does not contain bug data or does not match any action. A dictionary containing the values returned by the actions calls is returned. The action tag is used to index the responses in the dictionary. """ bug, event = request.bug, request.event runner_context = RunnerContext( bug=bug, event=event, operation=Operation.HANDLE, ) try: if bug.is_private: raise IgnoreInvalidRequestError("private bugs are not supported") try: relevant_actions = lookup_actions(bug, actions) except ActionNotFoundError as err: raise IgnoreInvalidRequestError( f"no bug whiteboard matching action tags: {err}" ) from err logger.info( "Handling incoming request", extra=runner_context.model_dump(), ) try: bug = get_bugzilla_service().refresh_bug_data(bug) except BugNotAccessibleError as err: # This can happen if the bug is made private after the webhook # is processed (eg. if it spent some time in the DL queue) raise IgnoreInvalidRequestError(str(err)) from err runner_context = runner_context.update(bug=bug, actions=relevant_actions) return do_execute_actions(runner_context, bug, relevant_actions) except IgnoreInvalidRequestError as exception: logger.info( "Ignore incoming request: %s", exception, extra=runner_context.update(operation=Operation.IGNORE).model_dump(), ) statsd.incr("jbi.bugzilla.ignored.count") raise @statsd.timer("jbi.action.execution.timer") def do_execute_actions( runner_context: RunnerContext, bug: bugzilla_models.Bug, actions: Actions, ): """Execute the provided actions on the bug, within the provided context. This will raise an `IgnoreInvalidRequestError` error if the request does not contain bug data or does not match any action. A dictionary containing the values returned by the actions calls is returned. The action tag is used to index the responses in the dictionary. """ runner_context = runner_context.update(bug=bug) runner_context = runner_context.update(actions=actions) event = runner_context.event details = {} for action in actions: linked_issue_key: Optional[str] = bug.extract_from_see_also( project_key=action.jira_project_key ) action_context = ActionContext( action=action, bug=bug, event=event, operation=Operation.IGNORE, jira=JiraContext(project=action.jira_project_key, issue=linked_issue_key), extra={k: str(v) for k, v in action.parameters.model_dump().items()}, ) if action_context.jira.issue is None: if event.target == "bug": action_context = action_context.update(operation=Operation.CREATE) else: # Check that issue exists (and is readable) jira_issue = jira.get_service().get_issue( action_context.update(operation=Operation.HANDLE), action_context.jira.issue, ) if not jira_issue: raise IgnoreInvalidRequestError( f"ignore unreadable issue {action_context.jira.issue}" ) # Make sure that associated project in configuration matches the # project of the linked Jira issue (see #635) if ( project_key := jira_issue["fields"]["project"]["key"] ) != action_context.jira.project: # TODO: We're now executing multiple actions for a given bug, we # should probably either not fail and instead report which actions # failed to apply, or execute all the changes as a "transaction" and # roll them back if one of them fails. raise IgnoreInvalidRequestError( f"ignore linked project {project_key!r} (!={action_context.jira.project!r})" ) if event.target == "bug": action_context = action_context.update( operation=Operation.UPDATE, extra={ "changed_fields": ", ".join(event.changed_fields()), **action_context.extra, }, ) elif event.target == "comment": action_context = action_context.update(operation=Operation.COMMENT) elif event.target == "attachment": action_context = action_context.update(operation=Operation.ATTACHMENT) if action_context.operation == Operation.IGNORE: raise IgnoreInvalidRequestError( f"ignore event target {action_context.event.target!r}" ) logger.info( "Execute action '%s' for Bug %s", action.whiteboard_tag, bug.id, extra=runner_context.update(operation=Operation.EXECUTE).model_dump(), ) executor = Executor(parameters=action.parameters) handled, action_details = executor(context=action_context) details[action.whiteboard_tag] = action_details statsd.incr(f"jbi.operation.{action_context.operation.lower()}.count") logger.info( "Action %r executed successfully for Bug %s", action.whiteboard_tag, bug.id, extra=runner_context.update( operation=Operation.SUCCESS if handled else Operation.IGNORE ).model_dump(), ) statsd.incr("jbi.bugzilla.processed.count") return details