services/data/tagging_utils.py (20 lines of code) (raw):

from services.data.db_utils import DBResponse import copy async def apply_run_tags_to_db_response(flow_id, run_number, run_table_postgres, db_response: DBResponse) -> DBResponse: """ We want read APIs to return steps, tasks and artifact objects with tags and system_tags set to their ancestral Run. This is a prerequisite for supporting Run-based tag mutation. """ # we will return a modified copy of db_response new_db_response = copy.deepcopy(db_response) # Only replace tags if response code is legit # Object creation ought to return 201 (let's prepare for that) if new_db_response.response_code not in (200, 201): return new_db_response if isinstance(new_db_response.body, list): items_to_modify = new_db_response.body else: items_to_modify = [new_db_response.body] if not items_to_modify: return new_db_response # items_to_modify now references all the items we want to modify # The ancestral run must be successfully read from DB db_response_for_run = await run_table_postgres.get_run(flow_id, run_number) if db_response_for_run.response_code != 200: return DBResponse(response_code=500, body=db_response_for_run.body) run = db_response_for_run.body for item_as_dict in items_to_modify: item_as_dict['tags'] = run['tags'] item_as_dict['system_tags'] = run['system_tags'] return new_db_response