sync/phab/listen.py (109 lines of code) (raw):

import time import re from phabricator import Phabricator import newrelic.agent from .. import log from ..tasks import handle logger = log.get_logger(__name__) RE_EVENT = re.compile("[0-9]{5,}:") RE_COMMIT = re.compile("(committed|accepted|added a reverting change for) r[A-Z]+[a-f0-9]+:") class PhabEventListener: ignore_list = ["added inline comments to D", "added a comment to D", "added a reviewer for D", "added reviewers for D", "removed a reviewer for D", "removed reviewers for D", "requested review of D", "requested changes to D", "added a subscriber to D", "added a project to D", "edited reviewers for D", "updated the summary of D", # Maybe useful to upstream info? "accepted D", # Maybe useful to upstream info? "retitled D", # Maybe useful to upstream info? "blocking reviewer(s) for D", "planned changes to D", "updated subscribers of D", "resigned from D", "changed the edit policy for D", "removed a project from D", "updated D", "changed the visibility for D", "updated the test plan for D"] event_mapping = { "updated the diff for D": "commit", "created D": "commit", "closed D": "closed", "abandoned D": "abandoned", "added a reverting change for D": None, # Not sure what this is yet "reopened D": "commit", # This may need its own event type } def __init__(self, config): self.running = True self.timer_in_seconds = config['phabricator']['listener']['interval'] self.latest = None self.phab = Phabricator(host='https://phabricator.services.mozilla.com/api/', token=config['phabricator']['token']) self.phab.update_interfaces() def run(self): # Run until told to stop. while self.running: feed = self.get_feed() self.parse(feed) time.sleep(self.timer_in_seconds) @newrelic.agent.background_task(name='feed-fetching', group='Phabricator') def get_feed(self, before=None): """ """ if self.latest and before is None: before = int(self.latest['chronologicalKey']) feed = [] def chrono_key(feed_story_tuple): return int(feed_story_tuple[1]["chronologicalKey"]) # keep fetching stories from Phabricator until there are no more stories to fetch while True: result = self.phab.feed.query(before=before, view='text') if result.response: results = sorted(list(result.response.items()), key=chrono_key) results = list(map(self.map_feed_tuple, results)) feed.extend(results) if len(results) == 100 and before is not None: # There may be more events we wish to fetch before = int(results[-1]["chronologicalKey"]) continue break return feed @newrelic.agent.background_task(name='feed-parsing', group='Phabricator') def parse(self, feed): # Go through rows in reverse order, and ignore first row as it has the table headers for event in feed: if RE_COMMIT.search(event['text']): # This is a commit event, ignore it continue # Split the text to get the part that describes the event type event_text = RE_EVENT.split(event['text'])[0] # Check if this is an event we wish to ignore if any(event_type in event_text for event_type in PhabEventListener.ignore_list): continue # Map the event text to an event type so we know how to handle it event['type'] = self.map_event_type(event_text, event) if event['type'] is None: continue # Add the event to the queue, and set this as the latest parsed handle.apply_async(("phabricator", event)) self.latest = event @staticmethod def map_event_type(event_text, event): # Could use compiled regex expression instead for event_type, mapping in PhabEventListener.event_mapping.items(): if event_type in event_text: return mapping logger.warning("Unknown phabricator event type: %s" % event_text) newrelic.agent.record_custom_event("unknown_phabricator_event", params={ "event_text": event_text, "event": event, }, application=newrelic.agent.application()) @staticmethod def map_feed_tuple(feed_tuple): story_phid, feed_story = feed_tuple feed_story.update({"storyPHID": story_phid}) return feed_story def run_phabricator_listener(config): logger.info("Starting Phabricator listener") listener = PhabEventListener(config) listener.run() class MockPhabricator(Phabricator): def __init__(self, *args, **kwargs): self.feed = None pass def update_interfaces(self): pass