in src/dispatch/document/flows.py [0:0]
def update_document(document: Document, project_id: int, db_session: Session):
"""Updates an existing document."""
plugin = plugin_service.get_active_instance(
db_session=db_session, project_id=project_id, plugin_type="document"
)
if not plugin:
log.warning("Document not updated. No document plugin enabled.")
return
document_kwargs = {}
if document.resource_type == DocumentResourceTypes.case:
document_kwargs = {
"case_description": document.case.description,
"case_name": document.case.name,
"case_owner": document.case.assignee.individual.email,
"case_priority": document.case.case_priority.name,
"case_resolution": document.case.resolution,
"case_severity": document.case.case_severity.name,
"case_status": document.case.status,
"case_storage_weblink": resolve_attr(document.case, "storage.weblink"),
"case_title": document.case.title,
"case_type": document.case.case_type.name,
}
if (
document.resource_type == DocumentResourceTypes.incident
or document.resource_type == DocumentResourceTypes.review
):
document_kwargs = {
"commander_fullname": document.incident.commander.individual.name,
"conference_challenge": resolve_attr(document.incident, "conference.challenge"),
"conference_weblink": resolve_attr(document.incident, "conference.weblink"),
"conversation_weblink": resolve_attr(document.incident, "conversation.weblink"),
"description": document.incident.description,
"document_weblink": resolve_attr(document.incident, "incident_document.weblink"),
"name": document.incident.name,
"priority": document.incident.incident_priority.name,
"reported_at": document.incident.reported_at.strftime("%m/%d/%Y %H:%M:%S"),
"resolution": document.incident.resolution,
"severity": document.incident.incident_severity.name,
"status": document.incident.status,
"storage_weblink": resolve_attr(document.incident, "storage.weblink"),
"ticket_weblink": resolve_attr(document.incident, "ticket.weblink"),
"title": document.incident.title,
"type": document.incident.incident_type.name,
}
"""
Iterate through tags and create new replacement text. Prefix with “tag_”, i.e., for tag actor,
the template should have {{tag_actor}}. Also, create replacements for the source of each tag
type: {{tag_actor.source}}. Thus, if the source for actor was hacking,
this would be the replaced text. Only create the source replacements if not null.
For any tag types with multiple selected tags, replace with a comma-separated list.
"""
# first ensure all tags types have a placeholder in the document template
tag_types = tag_type_service.get_all_by_project(
db_session=db_session, project_id=project_id
)
for tag_type in tag_types:
document_kwargs[f"tag_{tag_type.name}"] = "N/A"
document_kwargs[f"tag_{tag_type.name}.source"] = "N/A"
# create document template placeholders for tags
for tag in document.incident.tags:
if document_kwargs[f"tag_{tag.tag_type.name}"] == "N/A":
document_kwargs[f"tag_{tag.tag_type.name}"] = tag.name
else:
document_kwargs[f"tag_{tag.tag_type.name}"] += f", {tag.name}"
if tag.source:
if document_kwargs[f"tag_{tag.tag_type.name}.source"] == "N/A":
document_kwargs[f"tag_{tag.tag_type.name}.source"] = tag.source
else:
document_kwargs[f"tag_{tag.tag_type.name}.source"] += f", {tag.source}"
if document.resource_type == DocumentResourceTypes.review:
document_kwargs["stable_at"] = document.incident.stable_at.strftime("%m/%d/%Y %H:%M:%S")
plugin.instance.update(document.resource_id, **document_kwargs)
if document.resource_type == DocumentResourceTypes.case:
event_service.log_case_event(
db_session=db_session,
source=plugin.plugin.title,
description=f"{deslug(DocumentResourceTypes.case).lower().capitalize()} updated",
case_id=document.case.id,
)
if document.resource_type == DocumentResourceTypes.incident:
event_service.log_incident_event(
db_session=db_session,
source=plugin.plugin.title,
description=f"{deslug(DocumentResourceTypes.incident).lower().capitalize()} updated",
incident_id=document.incident.id,
)