in src/dispatch/incident/scheduled.py [0:0]
def incident_report_weekly(db_session: Session, project: Project):
"""Creates and sends incident weekly reports based on notifications."""
# don't send if set to false or no notification id is set
if project.send_weekly_reports is False or not project.weekly_report_notification_id:
return
# don't send if no enabled ai plugin
ai_plugin = plugin_service.get_active_instance(
db_session=db_session, plugin_type="artificial-intelligence", project_id=project.id
)
if not ai_plugin:
log.warning("Incident weekly reports not sent. No AI plugin enabled.")
return
# we fetch all closed incidents in the last week
incidents = get_all_last_x_hours_by_status(
db_session=db_session,
project_id=project.id,
status=IncidentStatus.closed,
hours=24 * 7,
)
storage_plugin = plugin_service.get_active_instance(
db_session=db_session, plugin_type="storage", project_id=project.id
)
if not storage_plugin:
log.warning(
f"Incident weekly reports not sent. No storage plugin enabled. Project: {project.name}."
)
return
items_grouped = []
items_grouped_template = INCIDENT_SUMMARY_TEMPLATE
# we create and send an incidents weekly report
for incident in incidents:
# Skip if no incident review document
if (
not incident.incident_review_document
or not incident.incident_review_document.resource_id
):
continue
# Skip restricted incidents
if incident.visibility == Visibility.restricted:
continue
# Skip if incident is a duplicate
if incident.duplicates:
continue
try:
# if already summary generated, use that instead
if incident.summary:
summary = incident.summary
else:
summary = ai_service.generate_incident_summary(
db_session=db_session, incident=incident
)
item = {
"commander_fullname": incident.commander.individual.name,
"commander_team": incident.commander.team,
"commander_weblink": incident.commander.individual.weblink,
"name": incident.name,
"ticket_weblink": resolve_attr(incident, "ticket.weblink"),
"title": incident.title,
"summary": summary,
}
items_grouped.append(item)
except Exception as e:
log.exception(e)
template = INCIDENT_WEEKLY_REPORT
# if no closed incidents or all closed incidents are restricted, send a different template
if not items_grouped:
template = INCIDENT_WEEKLY_REPORT_NO_INCIDENTS
notification_kwargs = {
"items_grouped": items_grouped,
"items_grouped_template": items_grouped_template,
}
notification_title_text = f"{project.name} {INCIDENT_WEEKLY_REPORT_TITLE}"
notification_params = {
"text": notification_title_text,
"type": MessageType.incident_weekly_report,
"template": template,
"kwargs": notification_kwargs,
}
notification = notification_service.get(
db_session=db_session, notification_id=project.weekly_report_notification_id
)
notification_service.send(
db_session=db_session,
project_id=notification.project.id,
notification=notification,
notification_params=notification_params,
)