def create()

in src/dispatch/incident/service.py [0:0]


def create(*, db_session: Session, incident_in: IncidentCreate) -> Incident:
    """Creates a new incident."""
    project = project_service.get_by_name_or_default(
        db_session=db_session, project_in=incident_in.project
    )

    incident_type = incident_type_service.get_by_name_or_default(
        db_session=db_session, project_id=project.id, incident_type_in=incident_in.incident_type
    )

    incident_priority = incident_priority_service.get_by_name_or_default(
        db_session=db_session,
        project_id=project.id,
        incident_priority_in=incident_in.incident_priority,
    )

    incident_severity = incident_severity_service.get_by_name_or_default(
        db_session=db_session,
        project_id=project.id,
        incident_severity_in=incident_in.incident_severity,
    )

    visibility = incident_type.visibility
    if incident_in.visibility:
        visibility = incident_in.visibility

    tag_objs = []
    for t in incident_in.tags:
        tag_objs.append(tag_service.get_or_create(db_session=db_session, tag_in=t))

    # We create the incident
    incident = Incident(
        description=incident_in.description,
        incident_priority=incident_priority,
        incident_severity=incident_severity,
        incident_type=incident_type,
        project=project,
        status=incident_in.status,
        tags=tag_objs,
        title=incident_in.title,
        visibility=visibility,
    )

    db_session.add(incident)
    db_session.commit()

    reporter_name = incident_in.reporter.individual.name if incident_in.reporter else ""

    event_service.log_incident_event(
        db_session=db_session,
        source="Dispatch Core App",
        description="Incident created",
        details={
            "title": incident.title,
            "description": incident.description,
            "type": incident.incident_type.name,
            "severity": incident.incident_severity.name,
            "priority": incident.incident_priority.name,
            "status": incident.status,
            "visibility": incident.visibility,
        },
        individual_id=incident_in.reporter.individual.id,
        incident_id=incident.id,
        owner=reporter_name,
        pinned=True,
    )

    # add reporter
    reporter_email = incident_in.reporter.individual.email
    participant_flows.add_participant(
        reporter_email,
        incident,
        db_session,
        roles=[ParticipantRoleType.reporter],
    )

    # add commander
    commander_email = commander_service_id = None
    if incident_in.commander_email:
        commander_email = incident_in.commander_email
    elif incident_in.commander:
        commander_email = incident_in.commander.individual.email
    else:
        commander_email, commander_service_id = resolve_and_associate_role(
            db_session=db_session, incident=incident, role=ParticipantRoleType.incident_commander
        )

    if not commander_email:
        # we make the reporter the commander if an email for the commander
        # was not provided or resolved via incident role policies
        commander_email = reporter_email

    participant_flows.add_participant(
        commander_email,
        incident,
        db_session,
        service_id=commander_service_id,
        roles=[ParticipantRoleType.incident_commander],
    )

    # add liaison
    liaison_email, liaison_service_id = resolve_and_associate_role(
        db_session=db_session, incident=incident, role=ParticipantRoleType.liaison
    )

    if liaison_email:
        # we only add the liaison if we are able to resolve its email
        # via incident role policies
        participant_flows.add_participant(
            liaison_email,
            incident,
            db_session,
            service_id=liaison_service_id,
            roles=[ParticipantRoleType.liaison],
        )

    # add scribe
    scribe_email, scribe_service_id = resolve_and_associate_role(
        db_session=db_session, incident=incident, role=ParticipantRoleType.scribe
    )

    if scribe_email:
        # we only add the scribe if we are able to resolve its email
        # via incident role policies
        participant_flows.add_participant(
            scribe_email,
            incident,
            db_session,
            service_id=scribe_service_id,
            roles=[ParticipantRoleType.scribe],
        )

    # add observer (if engage_next_oncall is enabled)
    incident_role = resolve_role(
        db_session=db_session, role=ParticipantRoleType.incident_commander, incident=incident
    )
    if incident_role and incident_role.engage_next_oncall:
        oncall_plugin = plugin_service.get_active_instance(
            db_session=db_session, project_id=incident.project.id, plugin_type="oncall"
        )
        if not oncall_plugin:
            log.debug("Resolved observer role not available since oncall plugin is not active.")
        else:
            oncall_email = oncall_plugin.instance.get_next_oncall(
                service_id=incident_role.service.external_id
            )
            # no need to add as observer if already added as commander
            if oncall_email and commander_email != oncall_email:
                participant_flows.add_participant(
                    oncall_email,
                    incident,
                    db_session,
                    service_id=incident_role.service.id,
                    roles=[ParticipantRoleType.observer],
                )

    return incident