in src/dispatch/data/source/service.py [0:0]
def update(*, db_session, source: Source, source_in: SourceUpdate) -> Source:
"""Updates an existing source."""
source_data = source.dict()
update_data = source_in.dict(
skip_defaults=True,
exclude={
"project",
"owner",
"tags",
"incidents",
"queries",
"source_environment",
"source_data_format",
"source_transport",
"source_status",
"source_type",
},
)
if source_in.owner:
source.owner = service_service.get_by_name_or_raise(
db_session=db_session, project_id=source.project.id, service_in=source_in.owner
)
tags = []
for t in source_in.tags:
tags.append(
tag_service.get_by_name_or_raise(
db_session=db_session, tag_in=t, project_id=source.project.id
)
)
source.tags = tags
incidents = []
for i in source_in.incidents:
incidents.append(
incident_service.get_by_name_or_raise(
db_session=db_session, incident_in=i, project_id=source.project.id
)
)
source.incidents = incidents
queries = []
for q in source_in.queries:
queries.append(
query_service.get_by_name_or_raise(
db_session=db_session, query_in=q, project_id=source.project.id
)
)
source.queries = queries
if source_in.source_environment:
source.source_environment = environment_service.get_by_name_or_raise(
db_session=db_session,
project_id=source.project.id,
source_environment_in=source_in.source_environment,
)
if source_in.source_type:
source.source_type = type_service.get_by_name_or_raise(
db_session=db_session,
project_id=source.project.id,
source_type_in=source_in.source_type,
)
if source_in.source_transport:
source.source_transport = transport_service.get_by_name_or_raise(
db_session=db_session,
project_id=source.project.id,
source_transport_in=source_in.source_transport,
)
if source_in.source_data_format:
source.source_data_format = data_format_service.get_by_name_or_raise(
db_session=db_session,
project_id=source.project.id,
source_data_format_in=source_in.source_data_format,
)
if source_in.source_status:
source.source_status = status_service.get_by_name_or_raise(
db_session=db_session,
project_id=source.project.id,
source_status_in=source_in.source_status,
)
for field in source_data:
if field in update_data:
setattr(source, field, update_data[field])
db_session.commit()
return source