in src/dispatch/incident/flows.py [0:0]
def incident_closed_status_flow(incident: Incident, db_session=None):
"""Runs the incident closed flow."""
# we inactivate all participants
inactivate_incident_participants(incident, db_session)
# we set the closed time
incident.closed_at = datetime.utcnow()
db_session.add(incident)
db_session.commit()
# we archive the conversation
conversation_flows.archive_conversation(subject=incident, db_session=db_session)
if incident.visibility == Visibility.open:
storage_plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=incident.project.id, plugin_type="storage"
)
if storage_plugin:
if storage_plugin.configuration.open_on_close:
for document in [incident.incident_document, incident.incident_review_document]:
document_flows.open_document_access(document=document, db_session=db_session)
if storage_plugin.configuration.read_only:
for document in [incident.incident_document, incident.incident_review_document]:
document_flows.mark_document_as_readonly(
document=document, db_session=db_session
)
for case in incident.cases:
try:
case.resolution = (
f"Closed as part of incident {incident.name}. See incident for more details."
)
case.resolution_reason = CaseResolutionReason.escalated
case.status = CaseStatus.closed
case_flows.case_closed_status_flow(case=case, db_session=db_session)
except Exception as e:
log.exception(
f"Failed to close case {case.name} while closing incident {incident.name}. Error: {str(e)}"
)
# we send a direct message to the incident commander asking to review
# the incident's information and to tag the incident if appropriate
send_incident_closed_information_review_reminder(incident, db_session)
# we send a direct message to all participants asking them
# to rate and provide feedback about the incident
send_incident_rating_feedback_message(incident, db_session)
# if an AI plugin is enabled, we send the incident review doc for summary
ai_service.generate_incident_summary(incident=incident, db_session=db_session)