in src/dispatch/task/service.py [0:0]
def create(*, db_session, task_in: TaskCreate) -> Task:
"""Create a new task."""
incident = incident_service.get(db_session=db_session, incident_id=task_in.incident.id)
assignees = []
for i in task_in.assignees:
participant = participant_service.get_by_incident_id_and_email(
db_session=db_session, incident_id=incident.id, email=i.individual.email
)
assignee = incident_flows.incident_add_or_reactivate_participant_flow(
db_session=db_session,
incident_id=incident.id,
user_email=i.individual.email,
)
if not participant or not participant.active_roles:
# send emphemeral message to user about why they are being added to the incident
send_task_add_ephemeral_message(
assignee_email=i.individual.email,
incident=incident,
db_session=db_session,
task=task_in,
)
# due to the freeform nature of task assignment, we can sometimes pick up other emails
# e.g. a google group that we cannot resolve to an individual assignee
if assignee:
assignees.append(assignee)
if not task_in.creator:
creator_email = task_in.owner.individual.email
else:
creator_email = task_in.creator.individual.email
# add creator as a participant if they are not one already
creator = incident_flows.incident_add_or_reactivate_participant_flow(
db_session=db_session,
incident_id=incident.id,
user_email=creator_email,
)
# if we cannot find any assignees, the creator becomes the default assignee
if not assignees:
assignees.append(creator)
# we add owner as a participant if they are not one already
if task_in.owner:
owner = incident_flows.incident_add_or_reactivate_participant_flow(
db_session=db_session,
incident_id=incident.id,
user_email=task_in.owner.individual.email,
)
else:
owner = incident.commander
# set a reasonable default weblink for tasks
weblink = f"{DISPATCH_UI_URL}/{incident.project.organization.name}/tasks"
if task_in.weblink:
weblink = task_in.weblink
task = Task(
**task_in.dict(exclude={"assignees", "weblink", "owner", "incident", "creator"}),
creator=creator,
owner=owner,
assignees=assignees,
incident=incident,
weblink=weblink,
)
event_service.log_incident_event(
db_session=db_session,
source="Dispatch Core App",
description=f"New incident task created by {creator.individual.name}",
individual_id=creator.individual.id,
details={"weblink": task.weblink},
incident_id=incident.id,
owner=creator.individual.name,
)
db_session.add(task)
db_session.commit()
return task