in sync/command.py [0:0]
def do_list(git_gecko: Repo, git_wpt: Repo, sync_type: str, error: bool = False,
**kwargs: Any) -> None:
from . import downstream
from . import landing
from . import upstream
syncs: list[SyncProcess] = []
def filter_sync(sync):
if error:
return sync.error is not None and sync.status == "open"
return True
for cls in [upstream.UpstreamSync, downstream.DownstreamSync, landing.LandingSync]:
if not sync_type or cls.sync_type in sync_type:
syncs.extend(item for item in cls.load_by_status(git_gecko, git_wpt, "open")
if filter_sync(item))
for sync in syncs:
extra = []
if isinstance(sync, downstream.DownstreamSync):
try_push = sync.latest_try_push
if try_push:
if try_push.try_rev:
extra.append("https://treeherder.mozilla.org/#/jobs?repo=try&revision=%s" %
try_push.try_rev)
if try_push.taskgroup_id:
extra.append(try_push.taskgroup_id)
error_data = sync.error
error_msg = ""
if error_data is not None:
msg = error_data["message"]
if msg is not None:
error_msg = ("ERROR: %s" % msg.split("\n", 1)[0])
print("%s %s %s bug:%s PR:%s %s%s" %
("*"if sync.error else " ",
sync.sync_type,
sync.status,
sync.bug,
sync.pr,
" ".join(extra),
error_msg))