in src/dispatch/incident/messaging.py [0:0]
def send_incident_created_notifications(incident: Incident, db_session: Session):
"""Sends incident created notifications."""
notification_template = INCIDENT_NOTIFICATION.copy()
if incident.status != IncidentStatus.closed:
if incident.project.allow_self_join:
notification_template.insert(0, INCIDENT_NAME_WITH_ENGAGEMENT)
else:
notification_template.insert(0, INCIDENT_NAME_WITH_ENGAGEMENT_NO_SELF_JOIN)
else:
notification_template.insert(0, INCIDENT_NAME)
incident_description = (
incident.description
if len(incident.description) <= 500
else f"{incident.description[:500]}..."
)
notification_kwargs = {
"name": incident.name,
"title": incident.title,
"description": incident_description,
"visibility": incident.visibility,
"status": incident.status,
"type": incident.incident_type.name,
"type_description": incident.incident_type.description,
"severity": incident.incident_severity.name,
"severity_description": incident.incident_severity.description,
"priority": incident.incident_priority.name,
"priority_description": incident.incident_priority.description,
"reporter_fullname": incident.reporter.individual.name,
"reporter_team": incident.reporter.team,
"reporter_weblink": incident.reporter.individual.weblink,
"commander_fullname": incident.commander.individual.name,
"commander_team": incident.commander.team,
"commander_weblink": incident.commander.individual.weblink,
"document_weblink": resolve_attr(incident, "incident_document.weblink"),
"storage_weblink": resolve_attr(incident, "storage.weblink"),
"ticket_weblink": resolve_attr(incident, "ticket.weblink"),
"conference_weblink": resolve_attr(incident, "conference.weblink"),
"conference_challenge": resolve_attr(incident, "conference.conference_challenge"),
"contact_fullname": incident.commander.individual.name,
"contact_weblink": incident.commander.individual.weblink,
"incident_id": incident.id,
"organization_slug": incident.project.organization.slug,
}
faq_doc = document_service.get_incident_faq_document(
db_session=db_session, project_id=incident.project_id
)
if faq_doc:
notification_kwargs.update({"faq_weblink": faq_doc.weblink})
notification_params = {
"text": "Incident Notification",
"type": MessageType.incident_notification,
"template": notification_template,
"kwargs": notification_kwargs,
}
notification_service.filter_and_send(
db_session=db_session,
project_id=incident.project.id,
class_instance=incident,
notification_params=notification_params,
)
event_service.log_incident_event(
db_session=db_session,
source="Dispatch Core App",
description="Incident notifications sent",
incident_id=incident.id,
)
log.debug("Incident created notifications sent.")