services/jenkins-pipeline-monitor/handler.py (120 lines of code) (raw):

import os import boto3 import json import logging import secret_manager from datetime import datetime, timezone from jenkinsapi.jenkins import Jenkins logging.getLogger().setLevel(logging.INFO) logging.getLogger('boto3').setLevel(logging.CRITICAL) logging.getLogger('botocore').setLevel(logging.CRITICAL) release_job_type = ['mxnet_lib', 'python/pypi', 'python/docker'] def get_jenkins_obj(secret): """ This method returns an object of Jenkins instantiated using username, password """ jenkins_url, jenkins_username, jenkins_password = os.environ["JENKINS_URL"], secret["jenkins_username"], secret["jenkins_password"] return Jenkins(jenkins_url, username=jenkins_username, password=jenkins_password) def get_secret(): """ This method is to get secret value from Secrets Manager """ secret = json.loads(secret_manager.get_secret()) return secret def get_pipeline_job(jenkinsObj): job = jenkinsObj["restricted-mxnet-cd/mxnet-cd-release-job"] return job def get_latest_build_number(job): return job.get_last_build().get_number() def get_build_from_build_number(job, build_number): return job.get_build(build_number) def get_build_timestamp(build): return build.get_timestamp() def get_build_date(timestamp): return timestamp.date() def is_latest_day_build(current_build): current_build_timestamp = get_build_timestamp(current_build) current_time_stamp = datetime.now().replace(tzinfo=timezone.utc) # if current build is within 24 hours of the current time seconds_difference = (current_time_stamp - current_build_timestamp).total_seconds() hour_difference = divmod(seconds_difference, 3600)[0] if(hour_difference < 24): return True else: return False def get_latest_day_builds(job, latest_build_number): """ Get all the builds that were triggered in the past 24 hours from the current time i.e. the time when the Lambda function is triggered :param job: Jenkins Job object :param latest_build_number: latest build number from which to start checking :result: List[builds] """ builds = [] current_build_number = latest_build_number while True: current_build = get_build_from_build_number(job, current_build_number) if is_latest_day_build(current_build): builds.append(current_build) current_build_number -= 1 else: break return builds def get_release_job_type(build): return build.get_params()['RELEASE_JOB_TYPE'] def filter_by_release_job_type(latest_day_builds): filtered_builds = [] for build in latest_day_builds: if get_release_job_type(build) in release_job_type: filtered_builds.append(build) return filtered_builds def status_check(builds): """ Check the status of the filtered builds i.e. Check if all the required release job types are present in the pipeline If a build from the list of desired release job types doesn't exist, log the failure else check the status via Jenkins API and report accordingly :param builds """ # dictionary of the type release_job_type: count # e.g. {'mxnet_lib/static':0, 'python/pypi':0} global release_job_type success_count = 0 release_job_type_dict = {el: 0 for el in release_job_type} # iterate over the builds to count number of the desired release job types for build in builds: build_release_job_type = get_release_job_type(build) if build.get_status() == 'SUCCESS': logging.info(f'Successful build {build_release_job_type} {build.get_number()}') else: logging.info(f'Failure build {build_release_job_type} {build.get_number()}') release_job_type_dict[build_release_job_type] += 1 # iterate over the map of release_job_type: count # if 'mxnet_lib/static':1 indicates static jobtype job ran in the pipeline # else 'mxnet_lib/static':0 indicates static jobtype never ran -> log as failed for release_job_type_name, release_job_type_count in release_job_type_dict.items(): if release_job_type_count == 0: logging.info(f'Failure build {release_job_type_name}') elif release_job_type_count == 1: success_count += 1 else: logging.info(f'{release_job_type} ran {release_job_type_count} times') # if success_count = 2 [i.e. len of release_job_type], it means both static & pypi jobs have run if success_count == len(release_job_type): logging.info(f'All the required jobs ran') else: logging.info(f'1/more of the required jobs did not run') def get_cause(build): return build.get_causes()[0]['_class'] def filter_by_upstream_cause(builds, desired_cause): filtered_builds = [] for build in builds: if get_cause(build) == desired_cause: filtered_builds.append(build) return filtered_builds def jenkins_pipeline_monitor(): # retrieve secret from secert manager secret = get_secret() logging.info(f'Secrets retrieved') # get jenkins object jenkinsObj = get_jenkins_obj(secret) logging.info(f'Jenkins obj created') # get relevant pipeline job job = get_pipeline_job(jenkinsObj) logging.info(f'Job fetch {job}') # get the latest build on the pipeline latest_build_number = get_latest_build_number(job) # get builds scheduled for the latest day latest_day_builds = get_latest_day_builds(job, latest_build_number) logging.info(f'latest builds {latest_day_builds}') # exit if no builds found if not latest_day_builds: logging.error('No builds for the latest day') return # filter latest day builds by desired build type a.k.a release job type filtered_builds = filter_by_release_job_type(latest_day_builds) logging.info(f'Builds filtered by desired release job type : {filtered_builds}') # exit if no builds of desired build type if not filtered_builds: logging.error('No builds of desired type') return desired_cause = 'hudson.model.Cause$UpstreamCause' filtered_builds = filter_by_upstream_cause(filtered_builds, desired_cause) # exit if builds not triggered by UpstreamCause if not filtered_builds: logging.error(f'Builds dont belong to desired cause:{desired_cause}') else: logging.info(f'Filtered builds by {desired_cause} : {filtered_builds}') status_check(filtered_builds) def lambda_handler(event, context): try: logging.info(f'Lambda handler invoked') jenkins_pipeline_monitor() except Exception as e: logging.error("Lambda raised an exception! %s", exc_info=e) if __name__ == "__main__": logging.basicConfig(level=logging.DEBUG) lambda_handler(None, None)