in src/dispatch/task/service.py [0:0]
def update(*, db_session, task: Task, task_in: TaskUpdate, sync_external: bool = True) -> Task:
"""Update an existing task."""
# we add the assignees of the task to the incident if the status of the task is open
if task_in.status == TaskStatus.open:
# we don't allow a task to be unassigned
if task_in.assignees:
assignees = []
for i in task_in.assignees:
assignees.append(
incident_flows.incident_add_or_reactivate_participant_flow(
db_session=db_session,
incident_id=task.incident.id,
user_email=i.individual.email,
)
)
task.assignees = assignees
# we add the owner of the task to the incident if the status of the task is open
if task_in.owner:
if task_in.status == TaskStatus.open:
task.owner = incident_flows.incident_add_or_reactivate_participant_flow(
db_session=db_session,
incident_id=task.incident.id,
user_email=task_in.owner.individual.email,
)
update_data = task_in.dict(
skip_defaults=True, exclude={"assignees", "owner", "creator", "incident"}
)
for field in update_data.keys():
setattr(task, field, update_data[field])
if task_in.owner:
task.owner = participant_service.get_by_incident_id_and_email(
db_session=db_session,
incident_id=task.incident.id,
email=task_in.owner.individual.email,
)
if task_in.assignees:
assignees = []
for i in task_in.assignees:
assignees.append(
participant_service.get_by_incident_id_and_email(
db_session=db_session,
incident_id=task.incident.id,
email=i.individual.email,
)
)
task.assignees = assignees
# if we have an external task plugin enabled, attempt to update the external resource as well
# we don't currently have a good way to get the correct file_id (we don't store a task <-> relationship)
# lets try in both the incident doc and PIR doc
drive_task_plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=task.incident.project.id, plugin_type="task"
)
if drive_task_plugin:
if sync_external:
try:
if task.incident.incident_document:
file_id = task.incident.incident_document.resource_id
drive_task_plugin.instance.update(
file_id, task.resource_id, resolved=task.status
)
except Exception:
if task.incident.incident_review_document:
file_id = task.incident.incident_review_document.resource_id
drive_task_plugin.instance.update(
file_id, task.resource_id, resolved=task.status
)
db_session.commit()
return task