services/github-bots/LabelBotFullFunctionality/LabelBot.py (214 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import ast import json import os import re from botocore.vendored import requests import logging import secret_manager import hmac import hashlib class LabelBot: LABEL_PAGE_PARSE = 30 # Limit for total labels per page to parse def __init__(self, repo=os.environ.get("repo"), github_user=None, github_oauth_token=None, bot_user=None, bot_oauth_token=None, prediction_url=None, apply_secret=True): """ Initializes the Label Bot :param repo: GitHub repository that is being referenced :param github_user: GitHub username :param github_oauth_token: GitHub authentication token (Personal access token) :param apply_secret: GitHub secret credential (Secret credential that is unique to a GitHub developer) """ self.repo = repo self.github_user = github_user self.github_oauth_token = github_oauth_token self.bot_user = bot_user self.bot_oauth_token = bot_oauth_token self.prediction_url = prediction_url if apply_secret: self._get_secret() self.auth = (self.github_user, self.github_oauth_token) self.bot_auth = (self.bot_user, self.bot_oauth_token) self.all_labels = None def _get_rate_limit(self): """ This method gets the remaining rate limit that is left from the GitHub API :return Remaining API requests left that GitHub will allow """ res = requests.get('https://api.github.com/rate_limit', auth=self.auth) res.raise_for_status() data = res.json()['rate'] return data['remaining'] def _get_secret(self): """ This method is to get secret value from Secrets Manager """ secret = json.loads(secret_manager.get_secret()) self.github_user = secret["github_user"] self.github_oauth_token = secret["github_oauth_token"] self.webhook_secret = secret["webhook_secret"] self.bot_user = secret["bot_user"] self.bot_oauth_token = secret["bot_oauth_token"] self.prediction_url = secret["prediction_url"] def _tokenize(self, string): """ This method is to extract labels from comments :param string: String parsed from a GitHub comment :return Set of Labels which have been extracted """ substring = string[string.find('[') + 1: string.rfind(']')] labels = [' '.join(label.split()).lower() for label in substring.split(',')] return labels def _ascii_only(self, raw_string, sub_string): """ This method is to convert all non-alphanumeric characters from raw_string to sub_string :param raw_string The original string messy string :param sub_string The string we want to convert to :return Fully converted string """ converted_string = re.sub("[^0-9a-zA-Z]", sub_string, raw_string) return converted_string.lower() def _find_all_labels(self): """ This method finds all existing labels in the repo :return A set of all labels which have been extracted from the repo """ url = f'https://api.github.com/repos/{self.repo}/labels' response = requests.get(url, auth=self.auth) response.raise_for_status() # Getting total pages of labels present if "link" not in response.headers: pages = 1 else: pages = int(self._ascii_only(response.headers['link'], " ").split()[-3]) all_labels = [] for page in range(1, pages + 1): url = 'https://api.github.com/repos/' + self.repo + '/labels?page=' + str(page) \ + '&per_page=%s' % self.LABEL_PAGE_PARSE response = requests.get(url, auth=self.auth) for item in response.json(): all_labels.append(item['name'].lower()) self.all_labels = set(all_labels) return set(all_labels) def _format_labels(self, labels): """ This method formats labels that a user specifies for a specific issue. This is meant to provide functionality for the operations on labels :param labels: The messy labels inputted by the user which we want to format :return: Formatted labels to send for CRUD operations """ assert self.all_labels, "Find all labels first" # clean labels, remove duplicated spaces. ex: "hello world" -> "hello world" labels = [" ".join(label.split()) for label in labels] labels = [label for label in labels if label.lower() in self.all_labels] return labels def add_labels(self, issue_num, labels): """ This method is to add a list of labels to one issue. It checks whether labels exist in the repo, and adds existing labels to the issue :param issue_num: The specific issue number we want to label :param labels: The labels which we want to add :return Response denoting success or failure for logging purposes """ labels = self._format_labels(labels) issue_labels_url = f'https://api.github.com/repos/{self.repo}/issues/{issue_num}/labels' response = requests.post(issue_labels_url, json.dumps(labels), auth=self.auth) if response.status_code == 200: logging.info(f'Successfully added labels to {issue_num}: {labels}.') return True else: logging.error(f'Could not add the labels to {issue_num}: {labels}. ' f'\nResponse: {json.dumps(response.json())}') return False def remove_labels(self, issue_num, labels): """ This method is to remove a list of labels to one issue. It checks whether labels exist in the repo, and removes existing labels to the issue :param issue_num: The specific issue number we want to label :param labels: The labels which we want to remove :return Response denoting success or failure for logging purposes """ labels = self._format_labels(labels) issue_labels_url = f'https://api.github.com/repos/{self.repo}/issues/{issue_num}/labels/' for label in labels: delete_label_url = issue_labels_url + label response = requests.delete(delete_label_url, auth=self.auth) if response.status_code == 200: logging.info(f'Successfully removed label to {issue_num}: {label}.') else: logging.error(f'Could not remove the label to {issue_num}: {label}. ' f'\nResponse: {json.dumps(response.json())}') return False return True def update_labels(self, issue_num, labels): """ This method is to update a list of labels to one issue. It checks whether labels exist in the repo, and updates existing labels to the issue :param issue_num: The specific issue number we want to label :param labels: The labels which we want to remove :return Response denoting success or failure for logging purposes """ labels = self._format_labels(labels) issue_labels_url = f'https://api.github.com/repos/{self.repo}/issues/{issue_num}/labels' response = requests.put(issue_labels_url, data=json.dumps(labels), auth=self.auth) if response.status_code == 200: logging.info(f'Successfully updated labels to {issue_num}: {labels}.') return True else: logging.error(f'Could not update the labels to {issue_num}: {labels}. ' f'\nResponse: {json.dumps(response.json())}') return False def replace_label(self, issue_num, labels): """ This method is to change a label to another in an issue :param issue_num: The specific issue number we want to label :param labels: The labels which we want to change from and to :return: Response denoting success or failure for logging purposes """ labels = self._format_labels(labels) if len(labels) != 2: logging.error('Must only specify 2 labels when wanting to change labels') return False logging.info('Label on {} to change from: {} to {}'.format(str(issue_num), str(labels[0]), str(labels[1]))) if self.remove_labels(issue_num, [labels[0]]) and self.add_labels(issue_num, [labels[1]]): return True else: return False def predict_label(self, issue_num): predict_issue = {"issues": [issue_num]} header = {"Content-Type": 'application/json'} response = requests.post(self.prediction_url, data=json.dumps(predict_issue), headers=header) predicted_labels = response.json()[0]["predictions"] if response.status_code == 200: logging.info(f'Successfully predicted labels to {issue_num}: {predicted_labels}') else: logging.error("Unable to predict labels") return False if 'Question' in predicted_labels: message = "Hey, this is the MXNet Label Bot and I think you have raised a question. \n" \ "For questions, you can also submit on MXNet discussion forum (https://discuss.mxnet.io), " \ "where it will get a wider audience and allow others to learn as well. Thanks! \n " self.add_github_labels(issue_num, ['question']) else: message = "Hey, this is the MXNet Label Bot. \n Thank you for submitting the issue! I will try and " \ "suggest some labels so that the appropriate MXNet community members can help " \ "resolve it. \n " if predicted_labels: message += 'Here are my recommended label(s): {}'.format(', '.join(predicted_labels)) self.create_comment(issue_num, message) return True def create_comment(self, issue_num, message): """ This method will trigger a comment to an issue by the label bot :param issue_num: The issue we want to comment :param message: The comment message we want to send :return Response denoting success or failure for logging purposes """ send_msg = {"body": message} issue_comments_url = f'https://api.github.com/repos/{self.repo}/issues/{issue_num}/comments' response = requests.post(issue_comments_url, data=json.dumps(send_msg), auth=self.bot_auth) if response.status_code == 201: logging.info(f'Successfully commented {send_msg} to: {issue_num}') return True else: logging.error(f'Could not comment \n {json.dumps(response.json())}') return False def label_action(self, actions): """ This method will perform an actions for the labels that are provided. This function delegates the appropriate action to the correct methods. :param actions: The action we want to take on the label :return Response denoting success or failure for logging purposes """ if "add" in actions: return self.add_labels(actions["add"][0], actions["add"][1]) elif "remove" in actions: return self.remove_labels(actions["remove"][0], actions["remove"][1]) elif "update" in actions: return self.update_labels(actions["update"][0], actions["update"][1]) elif "replace" in actions: return self.replace_label(actions["replace"][0], actions["replace"][1]) else: return False def _secure_webhook(self, event): """ This method will validate the security of the webhook, it confirms that the secret of the webhook is matched and that each github event is signed appropriately :param event: The github event we want to validate :return Response denoting success or failure of security """ # Validating github event is signed try: git_signed = ast.literal_eval(event["Records"][0]['body'])['headers']["X-Hub-Signature"] except KeyError: raise Exception("WebHook from GitHub is not signed") git_signed = git_signed.replace('sha1=', '') # Signing our event with the same secret as what we assigned to github event secret = self.webhook_secret body = ast.literal_eval(event["Records"][0]['body'])['body'] secret_sign = hmac.new(key=secret.encode('utf-8'), msg=body.encode('utf-8'), digestmod=hashlib.sha1).hexdigest() # Validating signatures match return hmac.compare_digest(git_signed, secret_sign) def parse_webhook_data(self, event): """ This method triggers the label bot when the appropriate GitHub event is recognized by use of a webhook :param event: The event data that is received whenever a github issue, issue comment, etc. is made :return: Log statements which we can track in lambda """ try: github_event = ast.literal_eval(event["Records"][0]['body'])['headers']["X-GitHub-Event"] except KeyError: raise Exception("Not a GitHub Event") if not self._secure_webhook(event): raise Exception("Failed to validate WebHook security") try: payload = json.loads(ast.literal_eval(event["Records"][0]['body'])['body']) except ValueError: raise Exception("Decoding JSON for payload failed") # Grabs actual payload data of the appropriate GitHub event needed for labelling if github_event == "issue_comment": # Acquiring labels specific to this repo labels = [] actions = {} # Looks for and reads phrase referencing @mxnet-label-bot, and trims extra whitespace to single space if "@mxnet-label-bot" in payload["comment"]["body"]: phrase = payload["comment"]["body"][payload["comment"]["body"].find("@mxnet-label-bot"):payload["comment"]["body"].find("]")+1] phrase = ' '.join(phrase.split()) labels += self._tokenize(phrase) if not labels: logging.error(f'Message typed by user: {phrase}') raise Exception("Unable to gather labels from issue comments") self._find_all_labels() if not self.all_labels: raise Exception("Unable to gather labels from the repo") if not set(labels).intersection(set(self.all_labels)): logging.error(f'Labels entered by user: {set(labels)}') logging.error(f'Repo labels: {set(self.all_labels)}') raise Exception("Provided labels don't match labels from the repo") # Case so that ( add[label1] ) and ( add [label1] ) are treated the same way if phrase.split(" ")[1].find('[') != -1: action = phrase.split(" ")[1][:phrase.split(" ")[1].find('[')].lower() else: action = phrase.split(" ")[1].lower() issue_num = payload["issue"]["number"] actions[action] = issue_num, labels if not self.label_action(actions): logging.error(f'Unsupported actions: {actions}') raise Exception("Unrecognized/Infeasible label action for the mxnet-label-bot") # On creation of a new issue, automatically trigger the bot to recommend labels if github_event == "issues" and payload["action"] == "opened": self._find_all_labels() return self.predict_label(payload["issue"]["number"]) else: logging.info(f'GitHub Event unsupported by Label Bot: {github_event} {payload["action"]}')