services/metadata_service/api/task.py (113 lines of code) (raw):
from services.data import TaskRow
from services.data.postgres_async_db import AsyncPostgresDB
from services.data.tagging_utils import apply_run_tags_to_db_response
from services.utils import has_heartbeat_capable_version_tag, read_body
from services.metadata_service.api.utils import format_response, \
handle_exceptions
import json
from aiohttp import web
import asyncio
class TaskApi(object):
_task_table = None
lock = asyncio.Lock()
def __init__(self, app):
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks",
self.get_tasks,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/filtered_tasks",
self.get_filtered_tasks,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}",
self.get_task,
)
app.router.add_route(
"POST",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/task",
self.create_task,
)
app.router.add_route("POST",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/heartbeat",
self.tasks_heartbeat)
self._async_table = AsyncPostgresDB.get_instance().task_table_postgres
self._async_run_table = AsyncPostgresDB.get_instance().run_table_postgres
self._async_metadata_table = AsyncPostgresDB.get_instance().metadata_table_postgres
self._db = AsyncPostgresDB.get_instance()
@format_response
@handle_exceptions
async def get_tasks(self, request):
"""
---
description: get all tasks associated with the specified step.
tags:
- Tasks
parameters:
- name: "flow_id"
in: "path"
description: "flow_id"
required: true
type: "string"
- name: "run_number"
in: "path"
description: "run_number"
required: true
type: "string"
- name: "step_name"
in: "path"
description: "step_name"
required: true
type: "string"
produces:
- text/plain
responses:
"200":
description: successful operation. Return tasks
"405":
description: invalid HTTP Method
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
db_response = await self._async_table.get_tasks(flow_id, run_number, step_name)
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
return db_response
@format_response
@handle_exceptions
async def get_filtered_tasks(self, request):
"""
---
description: get all task ids that match the provided metadata field name and/or value.
tags:
- Tasks
parameters:
- name: "flow_id"
in: "path"
description: "flow_id"
required: true
type: "string"
- name: "run_number"
in: "path"
description: "run_number"
required: true
type: "string"
- name: "step_name"
in: "path"
description: "step_name"
required: true
type: "string"
- name: "metadata_field_name"
in: "query"
description: "Metadata field name to filter with"
type: "string"
- name: "pattern"
in: "query"
description: "A regexp pattern to filter the metadata values on"
type: "string"
produces:
- text/plain
responses:
"200":
description: successful operation. Return tasks
"405":
description: invalid HTTP Method
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
# possible filters
metadata_field = request.query.get("metadata_field_name", None)
pattern = request.query.get("pattern", None)
db_response, _ = await self._async_metadata_table.get_filtered_task_pathspecs(flow_id, run_number, step_name, metadata_field, pattern)
return db_response
@format_response
@handle_exceptions
async def get_task(self, request):
"""
---
description: get all artifacts associated with the specified task.
tags:
- Tasks
parameters:
- name: "flow_id"
in: "path"
description: "flow_id"
required: true
type: "string"
- name: "run_number"
in: "path"
description: "run_number"
required: true
type: "string"
- name: "step_name"
in: "path"
description: "step_name"
required: true
type: "string"
- name: "task_id"
in: "path"
description: "task_id"
required: true
type: "integer"
produces:
- text/plain
responses:
"200":
description: successful operation. Return task
"405":
description: invalid HTTP Method
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
task_id = request.match_info.get("task_id")
db_response = await self._async_table.get_task(
flow_id, run_number, step_name, task_id
)
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
return db_response
@format_response
@handle_exceptions
async def create_task(self, request):
"""
---
description: This end-point allow to test that service is up.
"tags" and "system_tags" values will be persisted to DB, but will not be
returned by read endpoints - the related run's "tags" and "system_tags" will
be returned instead.
tags:
- Tasks
parameters:
- name: "flow_id"
in: "path"
description: "flow_id"
required: true
type: "string"
- name: "run_number"
in: "path"
description: "run_number"
required: true
type: "string"
- name: "step_name"
in: "path"
description: "step_name"
required: true
type: "string"
- name: "body"
in: "body"
description: "body"
required: true
schema:
type: object
properties:
user_name:
type: string
tags:
type: object
system_tags:
type: object
task_id:
type: string
produces:
- 'text/plain'
responses:
"202":
description: successful operation. Return newly registered task
"400":
description: invalid HTTP Request
"405":
description: invalid HTTP Method
"""
flow_id = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
body = await read_body(request.content)
user = body.get("user_name")
tags = body.get("tags")
system_tags = body.get("system_tags")
task_name = body.get("task_id")
client_supports_heartbeats = has_heartbeat_capable_version_tag(system_tags)
if task_name and task_name.isnumeric():
return web.Response(status=400, body=json.dumps(
{"message": "provided task_name may not be a numeric"}))
run_number, run_id = await self._db.get_run_ids(flow_id, run_number)
task = TaskRow(
flow_id=flow_id,
run_number=run_number,
run_id=run_id,
step_name=step_name,
task_name=task_name,
user_name=user,
tags=tags,
system_tags=system_tags,
)
db_response = await self._async_table.add_task(task, fill_heartbeat=client_supports_heartbeats)
db_response = await apply_run_tags_to_db_response(flow_id, run_number, self._async_run_table, db_response)
if client_supports_heartbeats and db_response.response_code == 200:
await self._async_run_table.update_heartbeat(flow_id, run_number)
return db_response
@format_response
@handle_exceptions
async def tasks_heartbeat(self, request):
"""
---
description: update hb
tags:
- Tasks
parameters:
- name: "flow_id"
in: "path"
description: "flow_id"
required: true
type: "string"
- name: "run_number"
in: "path"
description: "run_number"
required: true
type: "string"
- name: "step_name"
in: "path"
description: "step_name"
required: true
type: "string"
- name: "task_id"
in: "path"
description: "task_id"
required: true
type: "string"
- name: "body"
in: "body"
description: "body"
required: true
schema:
type: object
produces:
- 'text/plain'
responses:
"200":
description: successful operation. Return newly registered run
"400":
description: invalid HTTP Request
"405":
description: invalid HTTP Method
"""
flow_name = request.match_info.get("flow_id")
run_number = request.match_info.get("run_number")
step_name = request.match_info.get("step_name")
task_id = request.match_info.get("task_id")
await self._async_run_table.update_heartbeat(flow_name, run_number)
return await self._async_table.update_heartbeat(flow_name,
run_number, step_name,
task_id)