sync/tasks.py (112 lines of code) (raw):

import os import traceback import time import filelock import newrelic.agent from . import bug from . import env from . import gh from . import handlers from . import log from . import repos from . import settings from .errors import RetryableError from .worker import worker logger = log.get_logger(__name__) handler_map = None _lock = None @settings.configure def setup_lock(config): global _lock if _lock is None: path = os.path.join(config["root"], "sync.lock") logger.info("Using lockfile at path %s" % path) _lock = filelock.FileLock(path) def with_lock(f): global _lock def inner(*args, **kwargs): if _lock is None: setup_lock() try: with _lock: return f(*args, **kwargs) # If some other task is waiting on the lock, give it a chance to # run before this task completes. That means that manual commands # will always win over celery tasks, which we want time.sleep(0.1) except Exception: logger.error("".join(traceback.format_exc())) raise inner.__name__ = f.__name__ inner.__doc__ = f.__doc__ return inner @settings.configure def get_handlers(config): global handler_map if handler_map is None: handler_map = { "github": handlers.GitHubHandler(config), "push": handlers.PushHandler(config), "decision-task": handlers.DecisionTaskHandler(config), "try-task": handlers.TryTaskHandler(config), "taskgroup": handlers.TaskGroupHandler(config), "phabricator": handlers.PhabricatorHandler(config), } return handler_map @settings.configure def setup(config): env.set_env(config, None, None) gecko_repo = repos.Gecko(config) git_gecko = gecko_repo.repo() wpt_repo = repos.WebPlatformTests(config) git_wpt = wpt_repo.repo() gh_wpt = gh.GitHub(config["web-platform-tests"]["github"]["token"], config["web-platform-tests"]["repo"]["url"]) bz = bug.Bugzilla(config) env.set_env(config, bz, gh_wpt) logger.info("Gecko repository: %s" % git_gecko.working_dir) logger.info("wpt repository: %s" % git_wpt.working_dir) logger.info("Tasks enabled: %s" % (", ".join(list(config["sync"]["enabled"].keys())))) return git_gecko, git_wpt @worker.task(bind=True, max_retries=6, retry_backoff=60, retry_backoff_max=3840) @with_lock def handle(self, task, body): handlers = get_handlers() if task in handlers: logger.info("Running task %s" % task) newrelic.agent.add_custom_parameter("task", task) git_gecko, git_wpt = setup() try: handlers[task](git_gecko, git_wpt, body) except RetryableError as e: self.retry(exc=e.wrapped) except Exception: logger.error(body) logger.error("".join(traceback.format_exc())) raise else: logger.error("No handler for %s" % task) @worker.task(bind=True, max_retries=6, retry_backoff=60, retry_backoff_max=3840) @with_lock @settings.configure def land(self, config): git_gecko, git_wpt = setup() try: handlers.LandingHandler(config)(git_gecko, git_wpt, {}) except RetryableError as e: self.retry(exc=e.wrapped) @worker.task @with_lock @settings.configure def cleanup(config): git_gecko, git_wpt = setup() handlers.CleanupHandler(config)(git_gecko, git_wpt, {}) @worker.task @with_lock @settings.configure def retrigger(config): git_gecko, git_wpt = setup() handlers.RetriggerHandler(config)(git_gecko, git_wpt, {}) @worker.task @with_lock @settings.configure def update_bugs(config): git_gecko, git_wpt = setup() handlers.BugUpdateHandler(config)(git_gecko, git_wpt, {})