services/ui_backend_service/api/log.py (141 lines of code) (raw):
from typing import Optional, Tuple
from services.data.db_utils import DBResponse, translate_run_key, translate_task_key, DBPagination, DBResponse
from services.utils import handle_exceptions, web_response
from .utils import format_response_list, get_pathspec_from_request, logger
from aiohttp import web
from multidict import MultiDict
STDOUT = 'log_location_stdout'
STDERR = 'log_location_stderr'
class LogApi(object):
def __init__(self, app, db, cache=None):
self.db = db
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/logs/out",
self.get_task_log_stdout,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/logs/err",
self.get_task_log_stderr,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/logs/out/download",
self.get_task_log_stdout_file,
)
app.router.add_route(
"GET",
"/flows/{flow_id}/runs/{run_number}/steps/{step_name}/tasks/{task_id}/logs/err/download",
self.get_task_log_stderr_file,
)
self.metadata_table = self.db.metadata_table_postgres
self.task_table = self.db.task_table_postgres
# Cache to hold already fetched logs
self.cache = getattr(cache, "log_cache", None)
@handle_exceptions
async def get_task_log_stdout(self, request):
"""
---
description: Get STDOUT log of a Task
tags:
- Task
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Path/task_id'
- $ref: '#/definitions/Params/Custom/attempt_id'
produces:
- application/json
responses:
"200":
description: Return a tasks stdout log
schema:
$ref: '#/definitions/ResponsesLog'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"404":
description: Log for task could not be found
schema:
$ref: '#/definitions/ResponsesError404'
"500":
description: Internal Server Error (with error id)
schema:
$ref: '#/definitions/ResponsesLogError500'
"""
return await self.get_task_log(request, STDOUT)
@handle_exceptions
async def get_task_log_stderr(self, request):
"""
---
description: Get STDERR log of a Task
tags:
- Task
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Path/task_id'
- $ref: '#/definitions/Params/Custom/attempt_id'
produces:
- application/json
responses:
"200":
description: Return a tasks stderr log
schema:
$ref: '#/definitions/ResponsesLog'
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"404":
description: Log for task could not be found
schema:
$ref: '#/definitions/ResponsesError404'
"500":
description: Internal Server Error (with error id)
schema:
$ref: '#/definitions/ResponsesLogError500'
"""
return await self.get_task_log(request, STDERR)
@handle_exceptions
async def get_task_log_stdout_file(self, request):
"""
---
description: Get STDOUT log of a Task
tags:
- Task
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Path/task_id'
- $ref: '#/definitions/Params/Custom/attempt_id'
produces:
- text/plain
responses:
"200":
description: Return a tasks stdout log as a file download
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"404":
description: Log for task could not be found
schema:
$ref: '#/definitions/ResponsesError404'
"500":
description: Internal Server Error (with error id)
schema:
$ref: '#/definitions/ResponsesLogError500'
"""
return await self.get_task_log_file(request, STDOUT)
@handle_exceptions
async def get_task_log_stderr_file(self, request):
"""
---
description: Get STDERR log of a Task
tags:
- Task
parameters:
- $ref: '#/definitions/Params/Path/flow_id'
- $ref: '#/definitions/Params/Path/run_number'
- $ref: '#/definitions/Params/Path/step_name'
- $ref: '#/definitions/Params/Path/task_id'
- $ref: '#/definitions/Params/Custom/attempt_id'
produces:
- text/plain
responses:
"200":
description: Return a tasks stderr log as a file download
"405":
description: invalid HTTP Method
schema:
$ref: '#/definitions/ResponsesError405'
"404":
description: Log for task could not be found
schema:
$ref: '#/definitions/ResponsesError404'
"500":
description: Internal Server Error (with error id)
schema:
$ref: '#/definitions/ResponsesLogError500'
"""
return await self.get_task_log_file(request, STDERR)
async def get_task_by_request(self, request):
flow_id, run_number, step_name, task_id, attempt_id = \
get_pathspec_from_request(request)
run_id_key, run_id_value = translate_run_key(run_number)
task_id_key, task_id_value = translate_task_key(task_id)
conditions = [
"flow_id = %s",
"{run_id_key} = %s".format(run_id_key=run_id_key),
"step_name = %s",
"{task_id_key} = %s".format(task_id_key=task_id_key)
]
values = [flow_id, run_id_value, step_name, task_id_value]
if attempt_id:
conditions.append("attempt_id = %s")
values.append(attempt_id)
# NOTE: Must enable joins so task has attempt_id present for filtering.
# Log cache action requires a task with an attempt_id,
# otherwise it defaults to attempt 0
db_response, *_ = await self.task_table.find_records(
fetch_single=True,
conditions=conditions,
values=values,
order=["attempt_id DESC"],
enable_joins=True,
expanded=True
)
if db_response.response_code == 200:
return db_response.body
return None
async def get_task_log(self, request, logtype=STDOUT):
"fetches log and emits it as a list of rows wrapped in json"
task = await self.get_task_by_request(request)
if not task:
return web_response(404, {'data': []})
limit, page, reverse_order = get_pagination_params(request)
lines, page_count = await read_and_output(self.cache, task, logtype, limit, page, reverse_order)
# paginated response
response = DBResponse(200, lines)
pagination = DBPagination(limit, limit * (page - 1), len(response.body), page)
status, body = format_response_list(request, response, pagination, page, page_count)
return web_response(status, body)
async def get_task_log_file(self, request, logtype=STDOUT):
"fetches log and emits it as a single file download response"
task = await self.get_task_by_request(request)
if not task:
return web_response(404, {'data': []})
log_filename = "{type}_{flow_id}_{run_number}_{step_name}_{task_id}-{attempt}.txt".format(
type="stdout" if logtype == STDOUT else "stderr",
flow_id=task['flow_id'],
run_number=task['run_number'],
step_name=task['step_name'],
task_id=task['task_id'],
attempt=task['attempt_id']
)
def _gen():
return stream_pages(self.cache, task, logtype, output_raw=True)
return await file_download_response(request, log_filename, _gen)
async def read_and_output(cache_client, task, logtype, limit=0, page=1, reverse_order=False, output_raw=False):
res = await cache_client.cache.GetLogFile(task, logtype, limit, page, reverse_order, output_raw, invalidate_cache=True)
if res.has_pending_request():
async for event in res.stream():
if event["type"] == "error":
# raise error, there was an exception during fetching.
raise LogException(event["message"], event["id"], event["traceback"])
await res.wait() # wait until results are ready
log_response = res.get()
if log_response is None:
# This should not happen under normal circumstances.
raise LogException("Cache returned None for log content and raised no errors. \
The cache server might be experiencing issues.")
return log_response["content"], log_response["pages"]
async def stream_pages(cache_client, task, logtype, output_raw):
page = 1
while True:
logs, _ = await read_and_output(cache_client, task, logtype, limit=1000, page=page, output_raw=output_raw)
if not logs:
break
yield logs
page += 1
def get_pagination_params(request):
"""extract pagination params from request
"""
# Page
page = max(int(request.query.get("_page", 1)), 1)
# Limit
# Default limit is 1000, maximum is 10_000
limit = min(int(request.query.get("_limit", 1000)), 10000)
# Order
order = request.query.get("_order")
reverse_order = order is not None and order.startswith("-row")
return limit, page, reverse_order
async def file_download_response(request, filename, async_line_iterator):
response = web.StreamResponse(
headers=MultiDict({'Content-Disposition': 'Attachment;filename={}'.format(filename)}),
)
await response.prepare(request)
# NOTE: this can not handle errors thrown by the cache, as status cannot be changed after .prepare() has been called.
async for lines in async_line_iterator():
await response.write(lines.encode("utf-8"))
await response.write_eof()
return response
class LogException(Exception):
def __init__(self, msg='Failed to read log', id='log-error', traceback_str=None):
self.message = msg
self.id = id
self.traceback_str = traceback_str
def __str__(self):
return self.message