sync/load.py (71 lines of code) (raw):

from __future__ import annotations from . import log from .env import Environment from typing import Iterable, TYPE_CHECKING from git.repo.base import Repo if TYPE_CHECKING: from sync.sync import SyncProcess env = Environment() logger = log.get_logger(__name__) def get_pr_sync(git_gecko: Repo, git_wpt: Repo, pr_id: int, log: bool = True, ) -> SyncProcess | None: from . import downstream from . import upstream sync = downstream.DownstreamSync.for_pr(git_gecko, git_wpt, pr_id) if not sync: sync = upstream.UpstreamSync.for_pr(git_gecko, git_wpt, pr_id) if log: if sync: logger.info(f"Got sync {sync!r} for PR {pr_id}") else: logger.info("No sync found for PR %s" % pr_id) return sync def get_bug_sync(git_gecko: Repo, git_wpt: Repo, bug_number: int, statuses: Iterable[str] | None = None ) -> dict[str, set[SyncProcess]]: from . import downstream from . import landing from . import upstream syncs = landing.LandingSync.for_bug(git_gecko, git_wpt, bug_number, statuses=statuses, flat=False) if not syncs: syncs = upstream.UpstreamSync.for_bug(git_gecko, git_wpt, bug_number, statuses=statuses, flat=False) if not syncs: syncs = downstream.DownstreamSync.for_bug(git_gecko, git_wpt, bug_number, statuses=statuses, flat=False) return syncs def get_syncs(git_gecko: Repo, git_wpt: Repo, sync_type: str, obj_id: int, status: str | None = None, seq_id: int | None = None ) -> set[SyncProcess]: from . import downstream from . import landing from . import upstream cls_types = { "downstream": downstream.DownstreamSync, "landing": landing.LandingSync, "upstream": upstream.UpstreamSync } cls = cls_types[sync_type] syncs = cls.load_by_obj(git_gecko, git_wpt, obj_id, seq_id=seq_id) if status: syncs = {sync for sync in syncs if sync.status == status} return syncs