in services/ui_backend_service/api/notify.py [0:0]
def resource_list(table_name: str, data: Dict):
"""
List of RESTful resources that the provided table and data are included in.
Used for determining which Web Socket subscriptions this resource relates to.
Parameters
----------
table_name : str
table name that the Data belongs to
data : Dict
Dictionary of the data for a record of the table.
Returns
-------
List
example:
[
"/runs",
"/flows/ExampleFlow/runs",
"/flows/ExampleFlow/runs/1234"
]
"""
resource_paths = {
FLOW_TABLE_NAME: [
"/flows",
"/flows/{flow_id}"
],
RUN_TABLE_NAME: [
"/runs",
"/flows/{flow_id}/runs",
"/flows/{flow_id}/runs/{run_number}"
],
STEP_TABLE_NAME: [
"/flows/{flow_id}/runs/{run_number}/steps",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}"
],
TASK_TABLE_NAME: [
"/flows/{flow_id}/runs/{run_number}/tasks",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/attempts"
],
ARTIFACT_TABLE_NAME: [
"/flows/{flow_id}/runs/{run_number}/artifacts",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/artifacts",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/artifacts",
],
METADATA_TABLE_NAME: [
"/flows/{flow_id}/runs/{run_number}/metadata",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/metadata"
]
}
if table_name in resource_paths:
return [path.format(**data) for path in resource_paths[table_name]]
return []