services/data/models.py (325 lines of code) (raw):

import time from .db_utils import get_exposed_run_id, get_exposed_task_id class FlowRow(object): flow_id: str = None user_name: str = None ts_epoch: int = 0 def __init__(self, flow_id, user_name, ts_epoch=None, tags=None, system_tags=None): self.flow_id = flow_id self.user_name = user_name if ts_epoch is None: ts_epoch = int(round(time.time() * 1000)) self.ts_epoch = ts_epoch self.tags = tags self.system_tags = system_tags def serialize(self, expanded: bool = False): return { "flow_id": self.flow_id, "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, } class RunRow(object): flow_id: str = None run_number: int = None run_id: str = None user_name: str = None ts_epoch: int = 0 def __init__( self, flow_id, user_name, run_number=None, run_id=None, ts_epoch=None, tags=None, system_tags=None, last_heartbeat_ts=None, ): self.flow_id = flow_id self.user_name = user_name self.run_number = run_number self.run_id = run_id self.tags = tags self.system_tags = system_tags if ts_epoch is None: ts_epoch = int(round(time.time() * 1000)) self.ts_epoch = ts_epoch self.last_heartbeat_ts = last_heartbeat_ts def serialize(self, expanded: bool = False): if expanded: return { "flow_id": self.flow_id, "run_number": self.run_number, "run_id": self.run_id, "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, "last_heartbeat_ts": self.last_heartbeat_ts } else: return { "flow_id": self.flow_id, "run_number": get_exposed_run_id(self.run_number, self.run_id), "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, "last_heartbeat_ts": self.last_heartbeat_ts } class StepRow(object): flow_id: str = None run_number: int = None run_id: str = None step_name: str = None user_name: str = None ts_epoch: int = 0 tags = None system_tags = None def __init__( self, flow_id, run_number, run_id, user_name, step_name, ts_epoch=None, tags=None, system_tags=None, ): self.flow_id = flow_id self.run_number = run_number if run_id is None: run_id = str(run_number) self.run_id = run_id self.step_name = step_name self.user_name = user_name if ts_epoch is None: ts_epoch = int(round(time.time() * 1000)) self.ts_epoch = ts_epoch self.tags = tags self.system_tags = system_tags def serialize(self, expanded: bool = False): if expanded: return { "flow_id": self.flow_id, "run_number": self.run_number, "run_id": self.run_id, "step_name": self.step_name, "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, } else: return { "flow_id": self.flow_id, "run_number": get_exposed_run_id(self.run_number, self.run_id), "step_name": self.step_name, "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, } class TaskRow(object): flow_id: str = None run_number: int = None run_id: str = None step_name: str = None task_id: int = None task_name: str = None user_name: str = None ts_epoch: int = 0 tags = None system_tags = None def __init__( self, flow_id, run_number, run_id, user_name, step_name, task_id=None, task_name=None, ts_epoch=None, tags=None, system_tags=None, last_heartbeat_ts=None, ): self.flow_id = flow_id self.run_number = run_number self.run_id = run_id self.step_name = step_name self.task_id = task_id self.task_name = task_name self.user_name = user_name if ts_epoch is None: ts_epoch = int(round(time.time() * 1000)) self.ts_epoch = ts_epoch self.tags = tags self.system_tags = system_tags self.last_heartbeat_ts = last_heartbeat_ts def serialize(self, expanded: bool = False): if expanded: return { "flow_id": self.flow_id, "run_number": self.run_number, "run_id": self.run_id, "step_name": self.step_name, "task_id": self.task_id, "task_name": self.task_name, "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, "last_heartbeat_ts": self.last_heartbeat_ts } else: return { "flow_id": self.flow_id, "run_number": get_exposed_run_id(self.run_number, self.run_id), "step_name": self.step_name, "task_id": get_exposed_task_id(self.task_id, self.task_name), "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, "last_heartbeat_ts": self.last_heartbeat_ts } class MetadataRow(object): flow_id: str = None run_number: int = None run_id: str = None step_name: str = None task_id: int = None task_name: str = None id: int = None # autoincrement field_name: str = None value: dict = None type: str = None user_name: str = None ts_epoch: int = 0 tags = None system_tags = None def __init__( self, flow_id, run_number, run_id, step_name, task_id, task_name, id, field_name, value, type, user_name, ts_epoch=None, tags=None, system_tags=None, ): self.flow_id = flow_id self.run_number = run_number self.run_id = run_id self.step_name = step_name self.task_id = task_id self.task_name = task_name self.field_name = field_name self.value = value self.type = type self.user_name = user_name if ts_epoch is None: ts_epoch = int(round(time.time() * 1000)) self.ts_epoch = ts_epoch self.id = id self.tags = tags self.system_tags = system_tags def serialize(self, expanded: bool = False): return { "id": self.id, "flow_id": self.flow_id, "run_number": get_exposed_run_id(self.run_number, self.run_id), "step_name": self.step_name, "task_id": get_exposed_task_id(self.task_id, self.task_name), "field_name": self.field_name, "value": self.value, "type": self.type, "user_name": self.user_name, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, } class ArtifactRow(object): flow_id: str = None run_number: int = None run_id: str = None step_name: str = None task_id: int = None task_name: str = None name: str = None location: str = None sha: str = None type: str = None content_type: str = None user_name: str = None attempt_id: int = 0 ts_epoch: int = 0 def __init__( self, flow_id, run_number, run_id, step_name, task_id, task_name, name, location, ds_type, sha, type, content_type, user_name, attempt_id, ts_epoch=None, tags=None, system_tags=None, ): self.flow_id = flow_id self.run_number = run_number self.run_id = run_id self.step_name = step_name self.task_id = task_id self.task_name = task_name self.name = name self.location = location self.ds_type = ds_type self.sha = sha self.type = type self.content_type = content_type self.user_name = user_name self.attempt_id = attempt_id if ts_epoch is None: ts_epoch = int(round(time.time() * 1000)) self.ts_epoch = ts_epoch self.tags = tags self.system_tags = system_tags def serialize(self, expanded: bool = False): return { "flow_id": self.flow_id, "run_number": get_exposed_run_id(self.run_number, self.run_id), "step_name": self.step_name, "task_id": get_exposed_task_id(self.task_id, self.task_name), "name": self.name, "location": self.location, "ds_type": self.ds_type, "sha": self.sha, "type": self.type, "content_type": self.content_type, "user_name": self.user_name, "attempt_id": self.attempt_id, "ts_epoch": self.ts_epoch, "tags": self.tags, "system_tags": self.system_tags, }