in services/metadata_service/api/artifact.py [0:0]
def __init__(self, app):
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/"
"tasks/{task_id}/artifacts/{artifact_name}",
self.get_artifact,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/"
"tasks/{task_id}/artifacts/{artifact_name}/attempt/{attempt_id}",
self.get_artifact_with_attempt,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/"
"tasks/{task_id}/artifacts",
self.get_artifacts_by_task,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/"
"tasks/{task_id}/attempt/{attempt_id}/artifacts",
self.get_artifacts_by_task_attempt,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/artifacts",
self.get_artifacts_by_step,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/artifacts",
self.get_artifacts_by_run,
)
app.router.add_route(
"POST",
"/flows/{flow_id}/runs/{run_number}/steps/"
"{step_name}/tasks/{task_id}/artifact",
self.create_artifacts,
)
self._async_table = AsyncPostgresDB.get_instance().artifact_table_postgres
self._async_run_table = AsyncPostgresDB.get_instance().run_table_postgres
self._db = AsyncPostgresDB.get_instance()