in src/dispatch/incident/service.py [0:0]
def create(*, db_session: Session, incident_in: IncidentCreate) -> Incident:
"""Creates a new incident."""
project = project_service.get_by_name_or_default(
db_session=db_session, project_in=incident_in.project
)
incident_type = incident_type_service.get_by_name_or_default(
db_session=db_session, project_id=project.id, incident_type_in=incident_in.incident_type
)
incident_priority = incident_priority_service.get_by_name_or_default(
db_session=db_session,
project_id=project.id,
incident_priority_in=incident_in.incident_priority,
)
incident_severity = incident_severity_service.get_by_name_or_default(
db_session=db_session,
project_id=project.id,
incident_severity_in=incident_in.incident_severity,
)
visibility = incident_type.visibility
if incident_in.visibility:
visibility = incident_in.visibility
tag_objs = []
for t in incident_in.tags:
tag_objs.append(tag_service.get_or_create(db_session=db_session, tag_in=t))
# We create the incident
incident = Incident(
description=incident_in.description,
incident_priority=incident_priority,
incident_severity=incident_severity,
incident_type=incident_type,
project=project,
status=incident_in.status,
tags=tag_objs,
title=incident_in.title,
visibility=visibility,
)
db_session.add(incident)
db_session.commit()
reporter_name = incident_in.reporter.individual.name if incident_in.reporter else ""
event_service.log_incident_event(
db_session=db_session,
source="Dispatch Core App",
description="Incident created",
details={
"title": incident.title,
"description": incident.description,
"type": incident.incident_type.name,
"severity": incident.incident_severity.name,
"priority": incident.incident_priority.name,
"status": incident.status,
"visibility": incident.visibility,
},
individual_id=incident_in.reporter.individual.id,
incident_id=incident.id,
owner=reporter_name,
pinned=True,
)
# add reporter
reporter_email = incident_in.reporter.individual.email
participant_flows.add_participant(
reporter_email,
incident,
db_session,
roles=[ParticipantRoleType.reporter],
)
# add commander
commander_email = commander_service_id = None
if incident_in.commander_email:
commander_email = incident_in.commander_email
elif incident_in.commander:
commander_email = incident_in.commander.individual.email
else:
commander_email, commander_service_id = resolve_and_associate_role(
db_session=db_session, incident=incident, role=ParticipantRoleType.incident_commander
)
if not commander_email:
# we make the reporter the commander if an email for the commander
# was not provided or resolved via incident role policies
commander_email = reporter_email
participant_flows.add_participant(
commander_email,
incident,
db_session,
service_id=commander_service_id,
roles=[ParticipantRoleType.incident_commander],
)
# add liaison
liaison_email, liaison_service_id = resolve_and_associate_role(
db_session=db_session, incident=incident, role=ParticipantRoleType.liaison
)
if liaison_email:
# we only add the liaison if we are able to resolve its email
# via incident role policies
participant_flows.add_participant(
liaison_email,
incident,
db_session,
service_id=liaison_service_id,
roles=[ParticipantRoleType.liaison],
)
# add scribe
scribe_email, scribe_service_id = resolve_and_associate_role(
db_session=db_session, incident=incident, role=ParticipantRoleType.scribe
)
if scribe_email:
# we only add the scribe if we are able to resolve its email
# via incident role policies
participant_flows.add_participant(
scribe_email,
incident,
db_session,
service_id=scribe_service_id,
roles=[ParticipantRoleType.scribe],
)
# add observer (if engage_next_oncall is enabled)
incident_role = resolve_role(
db_session=db_session, role=ParticipantRoleType.incident_commander, incident=incident
)
if incident_role and incident_role.engage_next_oncall:
oncall_plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=incident.project.id, plugin_type="oncall"
)
if not oncall_plugin:
log.debug("Resolved observer role not available since oncall plugin is not active.")
else:
oncall_email = oncall_plugin.instance.get_next_oncall(
service_id=incident_role.service.external_id
)
# no need to add as observer if already added as commander
if oncall_email and commander_email != oncall_email:
participant_flows.add_participant(
oncall_email,
incident,
db_session,
service_id=incident_role.service.id,
roles=[ParticipantRoleType.observer],
)
return incident