in src/dispatch/case/messaging.py [0:0]
def send_case_update_notifications(case: Case, previous_case: CaseRead, db_session: Session):
"""Sends notifications about case changes."""
notification_text = "Case Notification"
notification_type = MessageType.case_notification
notification_template = CASE_NOTIFICATION_COMMON.copy()
change = False
if previous_case.status != case.status:
change = True
notification_template.append(CASE_STATUS_CHANGE)
if previous_case.case_type.name != case.case_type.name:
notification_template.append(CASE_TYPE_CHANGE)
if previous_case.case_severity.name != case.case_severity.name:
notification_template.append(CASE_SEVERITY_CHANGE)
if previous_case.case_priority.name != case.case_priority.name:
notification_template.append(CASE_PRIORITY_CHANGE)
else:
if case.status != CaseStatus.closed:
if previous_case.case_type.name != case.case_type.name:
change = True
notification_template.append(CASE_TYPE_CHANGE)
if previous_case.case_severity.name != case.case_severity.name:
change = True
notification_template.append(CASE_SEVERITY_CHANGE)
if previous_case.case_priority.name != case.case_priority.name:
change = True
notification_template.append(CASE_PRIORITY_CHANGE)
if not change:
# we don't need to send notifications
log.debug("Case updated notifications not sent. No changes were made.")
return
notification_template.append(CASE_ASSIGNEE)
# we send an update to the case conversation if the case is active or stable
if case.status != CaseStatus.closed:
case_conversation_notification_template = notification_template.copy()
case_conversation_notification_template.insert(0, CASE_NAME)
convo_plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=case.project.id, plugin_type="conversation"
)
if convo_plugin:
convo_plugin.instance.send(
case.conversation.channel_id,
notification_text,
case_conversation_notification_template,
notification_type,
assignee_fullname=case.assignee.individual.name,
assignee_team=case.assignee.team,
assignee_weblink=case.assignee.individual.weblink,
case_priority_new=case.case_priority.name,
case_priority_old=previous_case.case_priority.name,
case_severity_new=case.case_severity.name,
case_severity_old=previous_case.case_severity.name,
case_status_new=case.status,
case_status_old=previous_case.status,
case_type_new=case.case_type.name,
case_type_old=previous_case.case_type.name,
name=case.name,
ticket_weblink=case.ticket.weblink,
title=case.title,
escalated_to_incident=case.incidents[0] if case.incidents else None,
)
else:
log.debug(
"Case updated notification not sent to case conversation. No conversation plugin enabled." # noqa
)
# we send a notification to the notification conversations and emails
fyi_notification_template = notification_template.copy()
if case.status != CaseStatus.closed:
if case.project.allow_self_join:
fyi_notification_template.insert(0, CASE_NAME_WITH_ENGAGEMENT)
else:
fyi_notification_template.insert(0, CASE_NAME_WITH_ENGAGEMENT_NO_SELF_JOIN)
else:
fyi_notification_template.insert(0, CASE_NAME)
notification_kwargs = {
"assignee_fullname": case.assignee.individual.name,
"assignee_team": case.assignee.team,
"assignee_weblink": case.assignee.individual.weblink,
"contact_fullname": case.assignee.individual.name,
"contact_weblink": case.assignee.individual.weblink,
"case_id": case.id,
"case_priority_new": case.case_priority.name,
"case_priority_old": previous_case.case_priority.name,
"case_severity_new": case.case_severity.name,
"case_severity_old": previous_case.case_severity.name,
"case_status_new": case.status,
"case_status_old": previous_case.status,
"case_type_new": case.case_type.name,
"case_type_old": previous_case.case_type.name,
"name": case.name,
"organization_slug": case.project.organization.slug,
"ticket_weblink": resolve_attr(case, "ticket.weblink"),
"title": case.title,
"escalated_to_incident": case.incidents[0] if case.incidents else None,
}
notification_params = {
"text": notification_text,
"type": notification_type,
"template": fyi_notification_template,
"kwargs": notification_kwargs,
}
notification_service.filter_and_send(
db_session=db_session,
project_id=case.project.id,
class_instance=case,
notification_params=notification_params,
)
log.debug("Case updated notifications sent.")