services/data/db_utils.py (82 lines of code) (raw):
import asyncio
from typing import List, Dict, Any
import psycopg2
import collections
import datetime
import time
import json
DBResponse = collections.namedtuple("DBResponse", "response_code body")
DBPagination = collections.namedtuple("DBPagination", "limit offset count page")
def aiopg_exception_handling(exception):
err_msg = str(exception)
body = {"err_msg": err_msg}
if isinstance(exception, asyncio.TimeoutError):
body = {
"err_msg": {
"type": "timeout error",
}
}
elif isinstance(exception, psycopg2.Error):
# this means that this is a psycopg2 exception
# since this is of type `psycopg2.Error` we can use https://www.psycopg.org/docs/module.html#psycopg2.Error
body = {
"err_msg": {
"pgerror": exception.pgerror,
"pgcode": exception.pgcode,
"diag": None
if exception.diag is None
else {
"message_primary": exception.diag.message_primary,
"severity": exception.diag.severity,
},
}
}
if isinstance(exception, psycopg2.IntegrityError):
if "duplicate key" in err_msg:
return DBResponse(response_code=409, body=json.dumps(body))
elif "foreign key" in err_msg:
return DBResponse(response_code=404, body=json.dumps(body))
else:
return DBResponse(response_code=500, body=json.dumps(body))
elif isinstance(exception, psycopg2.errors.UniqueViolation):
return DBResponse(response_code=409, body=json.dumps(body))
elif isinstance(exception, IndexError):
return DBResponse(response_code=404, body={})
else:
return DBResponse(response_code=500, body=json.dumps(body))
def get_db_ts_epoch_str():
return str(int(round(time.time() * 1000)))
def new_heartbeat_ts():
return int(datetime.datetime.utcnow().timestamp())
def translate_run_key(v: str):
value = str(v)
return "run_number" if value.isnumeric() else "run_id", value
def translate_task_key(v: str):
value = str(v)
return "task_id" if value.isnumeric() else "task_name", value
def get_exposed_run_id(run_number, run_id):
if run_id is not None:
return run_id
return run_number
def get_exposed_task_id(task_id, task_name):
if task_name is not None:
return task_name
return task_id
def get_latest_attempt_id_for_tasks(artifacts):
attempt_ids = {}
for artifact in artifacts:
attempt_ids[artifact["task_id"]] = max(
artifact["attempt_id"], attempt_ids.get(artifact["task_id"], 0)
)
return attempt_ids
def filter_artifacts_for_latest_attempt(
artifacts: List[Dict[str, Any]]
) -> List[Dict[str, Any]]:
# `artifacts` is a `list` of dictionaries where each item in the list
# consists of `ArtifactRow` in a dictionary form
attempt_ids = get_latest_attempt_id_for_tasks(artifacts)
return filter_artifacts_by_attempt_id_for_tasks(artifacts, attempt_ids)
def filter_artifacts_by_attempt_id_for_tasks(
artifacts: List[Dict[str, Any]], attempt_for_tasks: Dict[str, Any]
) -> List[dict]:
# `artifacts` is a `list` of dictionaries where each item in the list
# consists of `ArtifactRow` in a dictionary form
# `attempt_for_tasks` is a dictionary for form : {task_id:attempt_id}
result = []
for artifact in artifacts:
if artifact["attempt_id"] == attempt_for_tasks[artifact["task_id"]]:
result.append(artifact)
return result