utils/common/taskcluster_api.py (106 lines of code) (raw):

""" Typed responses for working with the Taskcluster API. """ from pydantic import BaseModel from typing import Any, Optional import taskcluster class Run(BaseModel): runId: int # 0, state: str # "completed" reasonCreated: str # "scheduled" reasonResolved: str # "completed" scheduled: str # "2024-11-12T13:50:55.910Z" # These are not present during an exception: workerGroup: Optional[str] = None # "us-central1-a" workerId: Optional[str] = None # "4909417939093873369" takenUntil: Optional[str] = None # "2024-11-12T14:10:56.073Z" started: Optional[str] = None # "2024-11-12T13:50:56.076Z" resolved: Optional[str] = None # "2024-11-12T13:52:36.465Z" class Metadata(BaseModel): description: str # "Dummy task that ensures all parts of training pipeline will run" name: str # "all-pipeline-en-lt-2" owner: str # "gregtatum@users.noreply.github.com" source: str # "https://github.com/mozilla/translations/blob/4b99af14117a3e662ee0a27bda93aa1170e964e7/taskcluster/kinds/all-pipeline" class TaskExtra(BaseModel): index: Any parent: str # "e1DMdEzNSGyGhdjaWFYpxQ" class Task(BaseModel): created: str # "2024-11-04T15:39:54.898Z" deadline: str # "2024-11-24T15:39:54.898Z" dependencies: list[str] # [ "CM1cp-ZWSnWkiQ96mKjmvA", "CnkDIu9LRCqI89thrhWAlQ", … ] expires: str # "2025-02-02T15:39:54.898Z" extra: TaskExtra metadata: Metadata payload: Any priority: str # "low" projectId: str # "none" provisionerId: str # "built-in" requires: str # "all-completed" retries: int routes: list[str] # [ "checks" ] schedulerId: str # "translations-level-1" scopes: list[Any] tags: dict[str, Any] taskGroupId: str # "TaeCdUs5Rqq7w1Tbf1PShQ" taskQueueId: str # "built-in/succeed" workerType: str # "succeed" @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): response: Any = queue.task(*args, **kwargs) return Task(**response) class Status(BaseModel): deadline: str # "2024-11-24T15:39:54.898Z" expires: str # "2025-02-02T15:39:54.898Z" projectId: str # "none" provisionerId: str # "built-in" retriesLeft: int runs: list[Run] schedulerId: str # "translations-level-1" state: str # "completed" taskGroupId: str # "TaeCdUs5Rqq7w1Tbf1PShQ" taskId: str # "CmximseBTi-d8tcWOl-KZA" taskQueueId: str # "built-in/succeed" workerType: str # "succeed" @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): response: Any = queue.status(*args, **kwargs) return Status(**response["status"]) class TaskAndStatus(BaseModel): task: Task status: Status @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): # This requires 2 API calls, even though other APIs return both. return TaskAndStatus( task=Task.call(queue, *args, **kwargs), status=Status.call(queue, *args, **kwargs) ) class GetTaskGroup(BaseModel): taskGroupId: str # 'I9uKJEPvQd-1zeItJK0cOQ' schedulerId: str # 'translations-level-1' expires: str # '2025-11-07T21:56:15.759Z' @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): response: Any = queue.getTaskGroup(*args, **kwargs) return GetTaskGroup(**response) class ListTaskGroup(BaseModel): expires: str # "2025-11-04T16:39:54.296Z" schedulerId: str # "translations-level-1" taskGroupId: str # "TaeCdUs5Rqq7w1Tbf1PShQ" tasks: list[TaskAndStatus] @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): response: Any = queue.listTaskGroup(*args, **kwargs) return ListTaskGroup(**response) class Artifact(BaseModel): storageType: str # "s3" name: str # "public/build/lex.50.50.enlt.s2t.bin.gz" expires: str # "2025-11-04T15:39:54.626Z" contentType: str # "application/gzip" class ListArtifacts(BaseModel): artifacts: list[Artifact] @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): response: Any = queue.listArtifacts(*args, **kwargs) return ListArtifacts(**response) class ListDependentTasks(BaseModel): taskId: str tasks: list[TaskAndStatus] @staticmethod def call(queue: taskcluster.Queue, *args, **kwargs): response: Any = queue.listDependentTasks(*args, **kwargs) return ListDependentTasks(**response)