services/lambda-mxnet-ci-bot/CIBot.py (260 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import ast import json import os import re import logging import secret_manager import hmac import hashlib import requests import jenkinsapi from jenkinsapi.jenkins import Jenkins from github import Github class CIBot: def __init__(self, repo=os.environ.get("repo"), github_user=None, github_personal_access_token=None, bot_user=None, bot_personal_access_token=None, jenkins_url=os.environ.get("jenkins_url"), jenkins_username=None, jenkins_password=None, apply_secret=True, auto_trigger=True): """ Initializes the CI Bot :param repo: GitHub repository that is being referenced :param github_user: GitHub username :param github_personal_access_token: GitHub authentication token (Personal access token) :param apply_secret: GitHub secret credential (Secret credential that is unique to a GitHub developer) :param auto_trigger: boolean variable to control Automatic triggering of Jenkins """ self.repo = repo self.github_user = github_user self.github_personal_access_token = github_personal_access_token self.bot_user = bot_user self.bot_personal_access_token = bot_personal_access_token self.jenkins_username = jenkins_username self.jenkins_password = jenkins_password if apply_secret: self._get_secret() self.auth = (self.github_user, self.github_personal_access_token) self.bot_auth = (self.bot_user, self.bot_personal_access_token) self.all_jobs = None self.jenkins_url = jenkins_url self.translation = {39: None} self.auto_trigger = auto_trigger if not self.auto_trigger: # Automatic Triggering of Jenkins is disabled # Boolean flag to decide whether to trigger CI after PR is merged self.run_after_merge = True else: self.run_after_merge = False def _get_secret(self): """ This method is to get secret value from Secrets Manager """ secret = json.loads(secret_manager.get_secret()) self.github_user = secret["github_user"] self.github_personal_access_token = secret["github_personal_access_token"] self.webhook_secret = secret["webhook_secret"] self.bot_user = secret["bot_user"] self.bot_personal_access_token = secret["bot_personal_access_token"] self.jenkins_username = secret["jenkins_username"] self.jenkins_password = secret["jenkins_password"] def _secure_webhook(self, event): """ This method will validate the security of the webhook, it confirms that the secret of the webhook is matched and that each github event is signed appropriately :param event: The github event we want to validate :return Response denoting success or failure of security """ # Validating github event is signed try: git_signed = ast.literal_eval(event["Records"][0]['body'])['headers']["X-Hub-Signature"] except KeyError: raise Exception("WebHook from GitHub is not signed") git_signed = git_signed.replace('sha1=', '') # Signing our event with the same secret as what we assigned to github event secret = self.webhook_secret body = ast.literal_eval(event["Records"][0]['body'])['body'] secret_sign = hmac.new(key=secret.encode('utf-8'), msg=body.encode('utf-8'), digestmod=hashlib.sha1).hexdigest() # Validating signatures match return hmac.compare_digest(git_signed, secret_sign) def _find_all_jobs(self): """ This method finds out all the jobs that are currently supported as part of CI """ # for now hardcoding list of jobs # ideally use Jenkins API to query list of jobs and parse it (bit complicated) all_jobs = ['clang', 'edge', 'centos-cpu', 'centos-gpu', 'windows-cpu', 'windows-gpu', 'miscellaneous', 'unix-cpu', 'unix-gpu', 'website', 'sanity'] self.all_jobs = set(all_jobs) def _get_job_trigger_token(self, name): secret = json.loads(secret_manager.get_secret()) return secret[name.replace('-', '_') + '_token'] def _pending_build_cleanup(self, job_instance, name): running = job_instance.is_queued_or_running() if running: logging.info('Status of last build : running') stop_status = job_instance.get_last_build().stop() if stop_status: logging.info('Turned off pending build') else: logging.info('No pending build') latestBuild = job_instance.get_last_build().get_status() logging.info(f'Status of last build : {latestBuild}') return def _trigger_job(self, jenkins_obj, job, branch): """ This method triggers a particular jenkins job :param jenkins_obj Jenkins Object :param job name of the job to be triggered :param branch e.g. master/PR """ try: name = "mxnet-validation/" + job + "/" + branch job = jenkins_obj[name] # check if the job is already in queue or running and kill that pending job first in that case self._pending_build_cleanup(job, name) logging.info(f'invoking {name}') response = job.invoke(block=False) logging.info(response) return True except jenkinsapi.custom_exceptions.UnknownJob: if not self.auto_trigger: # Jenkins Automatic Trigger is disabled # branch isn't discovered yet # trigger a scan of multi-branch pipeline url = self.jenkins_url + "multibranch-webhook-trigger/invoke" job_trigger_token = self._get_job_trigger_token(job) headers = {"token": job_trigger_token} r = requests.post(url, headers=headers) logging.info(r.text) return True else: raise Exception("Unable to invoke job due to unknownJob error") except Exception as e: raise Exception("Unable to invoke job due to %s", exc_info=e) def _get_jenkins_obj(self): """ This method returns an object of Jenkins instantiated using username, password """ return Jenkins(self.jenkins_url, username=self.jenkins_username, password=self.jenkins_password) def _trigger_ci(self, jobs, branch): """ This method is responsible for triggering the CI :param jobs: The jobs to trigger CI :param branch: PR Number or Master branch :response Response indicating success or failure of invoking Jenkins CI """ # get jenkins object jenkins_obj = self._get_jenkins_obj() # invoke CI via jenkins api logging.info(jobs) # list of successful jobs success_jobs = [] try: for job in jobs: logging.info(job) if self._trigger_job(jenkins_obj, job, branch): success_jobs.append(job) except Exception as e: logging.error("Unexpected error - %s", exc_info=e) raise Exception("Jenkins unable to trigger") return success_jobs def _get_github_object(self): """ This method returns github object initialized with Github personal access token """ github_obj = Github(self.github_personal_access_token) return github_obj def _is_mxnet_committer(self, comment_author): """ This method checks if the comment author is a member of MXNet committers It uses the Github API for fetching team members of a repo Only a Committer can access [read/write] to Apache MXNet Committer team on Github Retrieved the Team ID of the Apache MXNet Committer team on Github using a Committer's credentials """ github_obj = self._get_github_object() return github_obj.get_organization('apache').get_team(2413476).has_in_members(github_obj.get_user(comment_author)) def _is_authorized(self, comment_author, pr_author): """ This method checks if the comment author is authorized or not :param comment_author user ID of the user who commented on the PR :param pr_author user ID of the Author of the PR :response True/False indicates if comment_author is authorized or not """ # verify if the comment author is authorized to trigger CI # authorized users: # 1. PR Author # 2. MXNet Committer # 3. CI Admin # TODO : check for CI Admin if self._is_mxnet_committer(comment_author) or comment_author == pr_author: return True return False def _parse_jobs_from_comment(self, string): """ This method parses jobs from the user comment on the PR """ # extract substring between the square brackets [] substring = string[string.find('[') + 1: string.rfind(']')] jobs = [' '.join(label.split()).lower() for label in substring.split(',')] logging.info(f'parse jobs {jobs}') return jobs def parse_webhook_data(self, event): """ This method triggers the CI bot when the appropriate GitHub event is recognized by use of a webhook :param event: The event data that is received whenever a PR comment is made :return: Log statements which we can track in lambda """ try: github_event = ast.literal_eval(event["Records"][0]['body'])['headers']["X-GitHub-Event"] logging.info(f"github event {github_event}") except KeyError: raise Exception("Not a GitHub Event") if not self._secure_webhook(event): raise Exception("Failed to validate WebHook security") try: payload = json.loads(ast.literal_eval(event["Records"][0]['body'])['body']) except ValueError: raise Exception("Decoding JSON for payload failed") # find all jobs currently run in CI self._find_all_jobs() if not self.all_jobs: raise Exception("Unable to gather jobs from the CI") if(github_event == "pull_request"): pr_num = payload['number'] if(payload['action'] == 'opened'): logging.info('New PR create event detected. Send help guide.') pr_author = payload['pull_request']['user']['login'] if(self.auto_trigger): intro_mesg = "All tests are already queued to run once. If tests fail, you can trigger one or more tests again with the following commands:" else: intro_mesg = "Once your PR is ready for CI checks, invoke the following commands:" message = "Hey @" + pr_author + " , Thanks for submitting the PR \n" \ + intro_mesg + " \n" \ "- To trigger all jobs: @" + self.bot_user + " run ci [all] \n" \ "- To trigger specific jobs: @" + self.bot_user + " run ci [job1, job2] \n" \ "*** \n" \ "**CI supported jobs**: " + str(list(self.all_jobs)).translate(self.translation) + "\n" \ "*** \n" \ "_Note_: \n Only following 3 categories can trigger CI :" \ "PR Author, MXNet Committer, Jenkins Admin. \n" \ "All CI tests must pass before the PR can be merged. \n" self.create_comment(pr_num, message) elif(payload['action'] == 'closed' and payload['pull_request']['merged'] is True and self.run_after_merge): if(payload['pull_request']['base']['ref'] != 'master'): # PR not merged into master # no need to run CI logging.info('PR merged into' + payload['pull_request']['base']['ref'] + '.Hence ignore.') return # PR has been merged into master # trigger a final CI run on master successful_jobs = self._trigger_ci(self.all_jobs, "master") message = "PR #" + str(pr_num) + " merged. Congrats! \n" if successful_jobs: message += "Jenkins CI successfully triggered : " + str(successful_jobs).translate(self.translation) else: message += "However, the bot is unable to trigger CI." self.create_comment(pr_num, message) else: # other actions : reopened, deleted logging.info('Irrelevant PR related event. Ignore.') return if github_event in ["check_suite", "check_run", "status"]: # if payload["check_suite"]["app"]["slug"] == "github-actions": logging.info('Irrelevant event. Ignore.') return if "action" in payload: if(payload["action"] == 'deleted'): logging.info('comment deleted. Ignore.') return # fetch comment author comment_author = payload["comment"]["user"]["login"] # if comment author is label bot itself, ignore if comment_author == self.bot_user: logging.info('Ignore comments made by bot') return logging.info(f"payload loaded {payload}") issue_num = payload["issue"]["number"] # Grab actual payload data of the appropriate GitHub event needed for # triggering CI if github_event == "issue_comment": # Check if the bot is invoked from a PR or an issue if "@" + str(self.bot_user) in payload["comment"]["body"].lower() and "pull_request" not in payload["issue"]: # This means the bot didn't get invoked from a PR message = "Hey @" + comment_author + " \n @" + self.bot_user + " can only be invoked on a PR." self.create_comment(issue_num, message) logging.error("Bot invoked on an Issue instead of PR") return # Look for phrase referencing @<bot_user> if "@" + str(self.bot_user) in payload["comment"]["body"].lower(): # if(payload["comment"]["body"].find("]")==-1): # # ] not found in the phrase; capture everything bot's name onwards # phrase = payload["comment"]["body"][payload["comment"]["body"].find("@"+str(self.bot_user)):] # else: # # ] found in the phrase; capture everything between bot's name and end of list token ] phrase = payload["comment"]["body"][payload["comment"]["body"].find("@" + str(self.bot_user)):payload["comment"]["body"].find("]") + 1] # remove @<bot_user from the phrase phrase = phrase.replace('@' + self.bot_user, '') logging.info(phrase) # remove whitespace characters phrase = ' '.join(phrase.split()) # Handles both cases : ( run ci[job1] ) and ( run ci [job1] ) are treated the same way action = phrase[0:phrase.find('[')].strip() logging.info(f'action {action}') # only looking for the word run in PR Comment if action not in ['run ci']: message = "Undefined action detected. \n" \ "Permissible actions are : run ci [all], run ci [job1, job2] \n" \ "Example : @" + self.bot_user + " run ci [all] \n" \ "Example : @" + self.bot_user + " run ci [centos-cpu, clang]" self.create_comment(issue_num, message) logging.error(f'Undefined action by user: {action}') raise Exception("Undefined action by user") # parse jobs from the comment user_jobs = self._parse_jobs_from_comment(phrase) if not user_jobs: logging.error(f'Message typed by user: {phrase}') raise Exception("No jobs found from PR comment") # check if any of the jobs requested by user are supported by CI # intersection of user request jobs and CI supported jobs if user_jobs == ['all']: valid_jobs = self.all_jobs else: valid_jobs = list(set(user_jobs).intersection(set(self.all_jobs))) if not valid_jobs: logging.error(f'Jobs entered by user: {set(user_jobs)}') logging.error(f'CI supported Jobs: {set(self.all_jobs)}') message = "None of the jobs entered are supported. \n" \ "Jobs entered by user: " + str(user_jobs).translate(self.translation) + "\n" \ "CI supported Jobs: " + str(list(self.all_jobs)).translate(self.translation) + "\n" self.create_comment(issue_num, message) raise Exception("Provided jobs don't match the ones supported by CI") # check if the comment author is authorized pr_author = payload["issue"]["user"]["login"] if self._is_authorized(comment_author, pr_author): logging.info(f'Authorized user: {comment_author}') # since authorized user commented, go ahead trigger CI # branch to be triggered is PR-N where N is the PR number successful_jobs = self._trigger_ci(valid_jobs, "PR-" + str(issue_num)) if successful_jobs: message = "Jenkins CI successfully triggered : " + str(successful_jobs).translate(self.translation) else: message = "Authorized user recognized. However, the bot is unable to trigger CI." self.create_comment(issue_num, message) else: # since unauthorized user tried to trigger CI logging.info(f'Unauthorized user: {comment_author}') message = "Unauthorized access detected. \n" \ "Only following 3 categories can trigger CI : \n" \ "PR Author, MXNet Committer, Jenkins Admin." self.create_comment(issue_num, message) else: logging.error("CI Bot is not called") return else: logging.info(f'GitHub Event unsupported by CI Bot: {github_event}') # {payload["action"]} def create_comment(self, issue_num, message): """ This method will trigger a comment to an issue by the CI bot :param issue_num: The issue we want to comment :param message: The comment message we want to send :return Response denoting success or failure for logging purposes """ send_msg = {"body": message} issue_comments_url = f'https://api.github.com/repos/{self.repo}/issues/{issue_num}/comments' response = requests.post(issue_comments_url, data=json.dumps(send_msg), auth=self.bot_auth) if response.status_code == 201: logging.info(f'Successfully commented {send_msg} to: {issue_num}') return True else: logging.error(f'Could not comment \n {json.dumps(response.json())}') return False