jbi/jira/service.py (450 lines of code) (raw):
# This import is needed (as of Pyhon 3.11) to enable type checking with modules
# imported under `TYPE_CHECKING`
# https://docs.python.org/3/whatsnew/3.7.html#pep-563-postponed-evaluation-of-annotations
# https://docs.python.org/3/whatsnew/3.11.html#pep-563-may-not-be-the-future
from __future__ import annotations
import concurrent
import json
import logging
from functools import lru_cache
from typing import Any, Iterable, Optional
import requests
from dockerflow import checks
from requests import exceptions as requests_exceptions
from jbi import Operation, environment
from jbi.bugzilla import models as bugzilla_models
from jbi.jira.utils import markdown_to_jira
from jbi.models import ActionContext
from .client import JiraClient, JiraCreateError
settings = environment.get_settings()
logger = logging.getLogger(__name__)
JIRA_REQUIRED_PERMISSIONS = {
"ADD_COMMENTS",
"CREATE_ISSUES",
"DELETE_ISSUES",
"EDIT_ISSUES",
}
class JiraService:
"""Used by action workflows to perform action-specific Jira tasks"""
def __init__(self, client) -> None:
self.client = client
def fetch_visible_projects(self) -> list[str]:
"""Return list of projects that are visible with the configured Jira credentials"""
projects = self.client.permitted_projects()
return [project["key"] for project in projects]
def get_issue(self, context: ActionContext, issue_key):
"""Return the Jira issue fields or `None` if not found."""
logger.info("Getting issue %s", issue_key, extra=context.model_dump())
try:
response = self.client.get_issue(issue_key)
logger.info(
"Received issue %s",
issue_key,
extra={"response": response, **context.model_dump()},
)
return response
except requests_exceptions.HTTPError as exc:
if getattr(exc.response, "status_code", None) != 404:
raise
logger.error(
"Could not read issue %s: %s",
issue_key,
exc,
extra=context.model_dump(),
)
return None
def create_jira_issue(
self, context: ActionContext, description: str, issue_type: str
):
"""Create a Jira issue with basic fields in the project and return its key."""
bug = context.bug
fields: dict[str, Any] = {
"summary": bug.summary,
"issuetype": {"name": issue_type},
"description": markdown_to_jira(
description, max_length=context.action.parameters.jira_char_limit
),
"project": {"key": context.jira.project},
}
logger.info(
"Creating new Jira issue for Bug %s",
bug.id,
extra={"fields": fields, **context.model_dump()},
)
try:
response = self.client.create_issue(fields=fields)
except requests.HTTPError as exc:
assert exc.response is not None
try:
response = exc.response.json()
except json.JSONDecodeError:
response = exc.response.text
logger.exception(
"Failed to create issue for Bug %s",
bug.id,
extra={"response": response, **context.model_dump()},
)
raise JiraCreateError(f"Failed to create issue for Bug {bug.id}") from exc
# Jira response can be of the form: List or Dictionary
# if a list is returned, get the first item
issue_data = response[0] if isinstance(response, list) else response
logger.info(
"Jira issue %s created for Bug %s",
issue_data["key"],
bug.id,
extra={"response": response, **context.model_dump()},
)
return issue_data
def add_jira_comment(self, context: ActionContext):
"""Publish a comment on the specified Jira issue"""
context = context.update(operation=Operation.COMMENT)
commenter = context.event.user.login if context.event.user else "unknown"
if context.event.target == "attachment":
routing_key = (
context.event.routing_key or ".modify" # only to please type checking.
)
_, verb = routing_key.rsplit(".", 1)
past_verb = {"modify": "modified"}.get(verb, f"{verb}d")
formatted_comment = f"*{commenter}* {past_verb} an attachment"
else:
comment = context.bug.comment
assert comment # See jbi.steps.create_comment()
assert comment.body # Also see jbi.steps.create_comment()
prefix = f"*{commenter}* commented: \n"
formatted_comment = prefix + markdown_to_jira(
comment.body,
max_length=context.action.parameters.jira_char_limit - len(prefix),
)
issue_key = context.jira.issue
jira_response = self.client.issue_add_comment(
issue_key=issue_key,
comment=formatted_comment,
)
logger.info(
"User comment added to Jira issue %s",
issue_key,
extra=context.model_dump(),
)
return jira_response
def add_jira_comments_for_changes(self, context: ActionContext):
"""Add comments on the specified Jira issue for each change of the event"""
bug = context.bug
event = context.event
issue_key = context.jira.issue
comments: list = []
user = event.user.login if event.user else "unknown"
for change in event.changes or []:
if change.field in ["status", "resolution"]:
comments.append(
{
"modified by": user,
"resolution": bug.resolution,
"status": bug.status,
}
)
if change.field in ["assigned_to", "assignee"]:
comments.append({"assignee": bug.assigned_to})
jira_response_comments = []
for i, comment in enumerate(comments):
logger.info(
"Create comment #%s on Jira issue %s",
i + 1,
issue_key,
extra=context.update(operation=Operation.COMMENT).model_dump(),
)
jira_response = self.client.issue_add_comment(
issue_key=issue_key, comment=json.dumps(comment, indent=4)
)
jira_response_comments.append(jira_response)
return jira_response_comments
def delete_jira_issue_if_duplicate(
self, context: ActionContext, latest_bug: bugzilla_models.Bug
):
"""Rollback the Jira issue creation if there is already a linked Jira issue
on the Bugzilla ticket"""
issue_key = context.jira.issue
jira_key_in_bugzilla = latest_bug.extract_from_see_also(
project_key=context.jira.project
)
_duplicate_creation_event = (
jira_key_in_bugzilla is not None and issue_key != jira_key_in_bugzilla
)
if not _duplicate_creation_event:
return None
logger.warning(
"Delete duplicated Jira issue %s from Bug %s",
issue_key,
context.bug.id,
extra=context.update(operation=Operation.DELETE).model_dump(),
)
jira_response_delete = self.client.delete_issue(issue_id_or_key=issue_key)
return jira_response_delete
def add_link_to_bugzilla(self, context: ActionContext):
"""Add link to Bugzilla ticket in Jira issue"""
bug = context.bug
issue_key = context.jira.issue
bugzilla_url = f"{settings.bugzilla_base_url}/show_bug.cgi?id={bug.id}"
logger.info(
"Link %r on Jira issue %s",
bugzilla_url,
issue_key,
extra=context.update(operation=Operation.LINK).model_dump(),
)
icon_url = f"{settings.bugzilla_base_url}/favicon.ico"
return self.client.create_or_update_issue_remote_links(
issue_key=issue_key,
link_url=bugzilla_url,
title=bugzilla_url,
icon_url=icon_url,
icon_title=icon_url,
)
def clear_assignee(self, context: ActionContext):
"""Clear the assignee of the specified Jira issue."""
issue_key = context.jira.issue
logger.info("Clearing assignee", extra=context.model_dump())
return self.client.update_issue_field(key=issue_key, fields={"assignee": None})
def find_jira_user(self, context: ActionContext, email: str):
"""Lookup Jira users, raise an error if not exactly one found."""
logger.info("Find Jira user with email %s", email, extra=context.model_dump())
users = self.client.user_find_by_user_string(query=email)
if len(users) != 1:
raise ValueError(f"User {email} not found")
return users[0]
def assign_jira_user(self, context: ActionContext, email: str):
"""Set the assignee of the specified Jira issue, raise if fails."""
issue_key = context.jira.issue
assert issue_key # Until we have more fine-grained typing of contexts
jira_user = self.find_jira_user(context, email)
jira_user_id = jira_user["accountId"]
try:
# There doesn't appear to be an easy way to verify that
# this user can be assigned to this issue, so just try
# and do it.
return self.update_issue_field(
context, "assignee", jira_user_id, wrap_value="accountId"
)
except (requests_exceptions.HTTPError, IOError) as exc:
raise ValueError(
f"Could not assign {jira_user_id} to issue {issue_key}"
) from exc
def update_issue_field(
self,
context: ActionContext,
field: str,
value: Any,
wrap_value: Optional[str] = None,
):
bug = context.bug
issue_key = context.jira.issue
logger.info(
f"Updating {field} of Jira issue %s to %s for Bug %s",
issue_key,
value,
bug.id,
extra=context.model_dump(),
)
fields: dict[str, Any] = {field: {wrap_value: value} if wrap_value else value}
response = self.client.update_issue_field(key=issue_key, fields=fields)
logger.info(
f"Updated {field} of Jira issue %s to %s for Bug %s",
issue_key,
value,
bug.id,
extra={"response": response, **context.model_dump()},
)
return response
def update_issue_status(self, context: ActionContext, jira_status: str):
"""Update the status of the Jira issue"""
issue_key = context.jira.issue
assert issue_key # Until we have more fine-grained typing of contexts
kwargs: dict[str, Any] = {}
if jira_status == "Cancelled":
kwargs["fields"] = {
"resolution": {"name": "Invalid"},
}
kwargs["update"] = {
"comment": [{"add": {"body": "Issue was cancelled."}}],
}
logger.info(
"Updating Jira status to %s",
jira_status,
extra=context.model_dump(),
)
return self.client.set_issue_status(
issue_key,
jira_status,
**kwargs,
)
def update_issue_summary(self, context: ActionContext):
"""Update's an issue's summary with the description of an incoming bug"""
truncated_summary = context.bug.summary or ""
if len(truncated_summary) > context.action.parameters.jira_char_limit:
# Truncate on last word.
truncated_summary = truncated_summary[
: context.action.parameters.jira_char_limit
].rsplit(maxsplit=1)[0]
return self.update_issue_field(
context, field="summary", value=truncated_summary
)
def update_issue_resolution(self, context: ActionContext, jira_resolution: str):
"""Update the resolution of the Jira issue."""
return self.update_issue_field(
context,
field="resolution",
value=jira_resolution,
wrap_value="name",
)
def update_issue_components(
self,
context: ActionContext,
components: Iterable[str],
) -> tuple[Optional[dict], set]:
"""Attempt to add components to the specified issue
Args:
issue_key: key of the issues to add the components to
project: the project key
components: Component names to add to the issue
Returns:
The Jira response (if any), and any components that weren't added
to the issue because they weren't available on the project
"""
missing_components = set(components)
jira_components = []
all_project_components = self.client.get_project_components(
context.jira.project
)
for comp in all_project_components:
if comp["name"] in missing_components:
jira_components.append({"id": comp["id"]})
missing_components.remove(comp["name"])
if not jira_components:
return None, missing_components
resp = self.update_issue_field(
context, field="components", value=jira_components
)
return resp, missing_components
def update_issue_labels(
self, issue_key: str, add: Iterable[str], remove: Optional[Iterable[str]]
):
"""Update the labels for a specified issue
Args:
issue_key: key of the issues to modify the labels on
add: labels to add
remove (Optional): labels to remove
Returns:
The response from Jira
"""
if not remove:
remove = []
updated_labels = [{"add": label} for label in add] + [
{"remove": label} for label in remove
]
return self.client.update_issue(
issue_key=issue_key,
update={"update": {"labels": updated_labels}},
)
def check_jira_connection(self):
try:
if self.client.get_server_info(True) is None:
return [checks.Error("Login fails", id="login.fail")]
except requests.RequestException:
return [checks.Error("Could not connect to server", id="jira.server.down")]
return []
def check_jira_all_projects_are_visible(self, actions):
# Do not bother executing the rest of checks if connection fails.
if messages := self.check_jira_connection():
return messages
try:
visible_projects = self.fetch_visible_projects()
except requests.HTTPError:
return [
checks.Error(
"Error fetching visible Jira projects", id="jira.visible.error"
)
]
missing_projects = actions.configured_jira_projects_keys - set(visible_projects)
if missing_projects:
return [
checks.Warning(
f"Jira projects {missing_projects} are not visible with configured credentials",
id="jira.projects.missing",
)
]
return []
def check_jira_all_projects_have_permissions(self, actions):
"""Fetches and validates that required permissions exist for the configured projects"""
# Do not bother executing the rest of checks if connection fails.
if messages := self.check_jira_connection():
return messages
try:
projects = self.client.permitted_projects(JIRA_REQUIRED_PERMISSIONS)
except requests.HTTPError:
return [
checks.Error(
"Error fetching permitted Jira projects", id="jira.permitted.error"
)
]
projects_with_required_perms = {project["key"] for project in projects}
missing_perms = (
actions.configured_jira_projects_keys - projects_with_required_perms
)
if missing_perms:
missing = ", ".join(missing_perms)
return [
checks.Warning(
f"Missing permissions for projects {missing}",
id="jira.permitted.missing",
)
]
return []
def check_jira_all_project_custom_components_exist(self, actions):
# Do not bother executing the rest of checks if connection fails.
if messages := self.check_jira_connection():
return messages
results = []
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(self._check_project_components, action): action
for action in actions
if action.parameters.jira_components.set_custom_components
}
for future in concurrent.futures.as_completed(futures):
results.extend(future.result())
return results
def _check_project_components(self, action):
project_key = action.parameters.jira_project_key
specified_components = set(
action.parameters.jira_components.set_custom_components
)
try:
all_project_components = self.client.get_project_components(project_key)
except requests.HTTPError:
return [
checks.Error(
f"Error checking project components for {project_key}",
id="jira.components.error",
)
]
try:
all_components_names = set(comp["name"] for comp in all_project_components)
except KeyError:
return [
checks.Error(
f"Unexpected get_project_components response for {action.whiteboard_tag}",
id="jira.components.parsing",
)
]
unknown = specified_components - all_components_names
if unknown:
return [
checks.Warning(
f"Jira project {project_key} does not have components {unknown}",
id="jira.components.missing",
)
]
return []
def check_jira_all_project_issue_types_exist(self, actions):
# Do not bother executing the rest of checks if connection fails.
if messages := self.check_jira_connection():
return messages
try:
paginated_project_response = self.client.paginated_projects(
expand="issueTypes", keys=sorted(actions.configured_jira_projects_keys)
)
except requests.RequestException:
return [
checks.Error(
"Couldn't fetch projects",
id="jira.projects.error",
)
]
projects = paginated_project_response["values"]
issue_types_by_project = {
project["key"]: {issue_type["name"] for issue_type in project["issueTypes"]}
for project in projects
}
missing_issue_types_by_project = {}
for action in actions:
action_issue_types = set(action.parameters.issue_type_map.values())
project_issue_types = issue_types_by_project.get(
action.jira_project_key, set()
)
if missing_issue_types := action_issue_types - project_issue_types:
missing_issue_types_by_project[action.jira_project_key] = (
missing_issue_types
)
if missing_issue_types_by_project:
return [
checks.Warning(
f"Jira projects {set(missing_issue_types_by_project.keys())} with missing issue types",
obj=missing_issue_types_by_project,
id="jira.types.missing",
)
]
return []
def check_jira_pandoc_install(self):
if markdown_to_jira("- Test") != "* Test":
return [checks.Error("Pandoc conversion failed", id="jira.pandoc")]
return []
@lru_cache(maxsize=1)
def get_service():
"""Get atlassian Jira Service"""
client = JiraClient(
url=settings.jira_base_url,
username=settings.jira_username,
password=settings.jira_api_key, # package calls this param 'password' but actually expects an api key
cloud=True, # we run against an instance of Jira cloud
)
return JiraService(client=client)