in src/dispatch/incident/scheduled.py [0:0]
def incident_report_daily(db_session: Session, project: Project):
"""Creates and sends incident daily reports based on notifications."""
# don't send if set to false
if project.send_daily_reports is False:
return
# we fetch all active, stable and closed incidents
active_incidents = get_all_by_status(
db_session=db_session, project_id=project.id, status=IncidentStatus.active
)
stable_incidents = get_all_last_x_hours_by_status(
db_session=db_session,
project_id=project.id,
status=IncidentStatus.stable,
hours=24,
)
closed_incidents = get_all_last_x_hours_by_status(
db_session=db_session,
project_id=project.id,
status=IncidentStatus.closed,
hours=24,
)
incidents = active_incidents + stable_incidents + closed_incidents
# we map incidents to notification filters
incidents_notification_filters_mapping = defaultdict(lambda: defaultdict(lambda: []))
notifications = notification_service.get_all_enabled(
db_session=db_session, project_id=project.id
)
for incident in incidents:
for notification in notifications:
for search_filter in notification.filters:
match = search_filter_service.match(
db_session=db_session,
subject=search_filter.subject,
filter_spec=search_filter.expression,
class_instance=incident,
)
if match:
incidents_notification_filters_mapping[notification.id][
search_filter.id
].append(incident)
if not notification.filters:
incidents_notification_filters_mapping[notification.id][0].append(incident)
# we create and send an incidents daily report for each notification filter
for notification_id, search_filter_dict in incidents_notification_filters_mapping.items():
for _search_filter_id, incidents in search_filter_dict.items():
items_grouped = []
items_grouped_template = INCIDENT
for idx, incident in enumerate(incidents):
try:
item = {
"buttons": [],
"commander_fullname": incident.commander.individual.name,
"commander_team": incident.commander.team,
"commander_weblink": incident.commander.individual.weblink,
"incident_id": incident.id,
"name": incident.name,
"organization_slug": incident.project.organization.slug,
"priority": incident.incident_priority.name,
"priority_description": incident.incident_priority.description,
"severity": incident.incident_severity.name,
"severity_description": incident.incident_severity.description,
"status": incident.status,
"ticket_weblink": resolve_attr(incident, "ticket.weblink"),
"title": incident.title,
"type": incident.incident_type.name,
"type_description": incident.incident_type.description,
}
if incident.status != IncidentStatus.closed:
item["buttons"].append(
{
"button_text": "Subscribe",
"button_value": f"{incident.project.organization.slug}-{incident.id}",
"button_action": f"{ConversationButtonActions.subscribe_user}-{incident.status}-{idx}",
}
)
if incident.project.allow_self_join:
item["buttons"].append(
{
"button_text": "Join",
"button_value": f"{incident.project.organization.slug}-{incident.id}",
"button_action": f"{ConversationButtonActions.invite_user}-{incident.status}-{idx}",
}
)
items_grouped.append(item)
except Exception as e:
log.exception(e)
notification_kwargs = {
"items_grouped": items_grouped,
"items_grouped_template": items_grouped_template,
}
notification_title_text = f"{project.name} {INCIDENT_DAILY_REPORT_TITLE}"
notification_params = {
"text": notification_title_text,
"type": MessageType.incident_daily_report,
"template": INCIDENT_DAILY_REPORT,
"kwargs": notification_kwargs,
}
notification = notification_service.get(
db_session=db_session, notification_id=notification_id
)
notification_service.send(
db_session=db_session,
project_id=notification.project.id,
notification=notification,
notification_params=notification_params,
)