services/jenkins-run-statistics/jenkins_utils.py (139 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import json import logging import os import ast import re import ssl import sys import time import urllib.request from typing import Optional from datetime import datetime, timedelta, timezone from typing import Dict, List import dateutil import boto3 from botocore.exceptions import ClientError import botocore import dateutil import dateutil.parser import dateutil.tz import requests from requests_xml import XMLSession JENKINS_ALL_RUNS_API = 'view/all/cc.xml?recursive' JENKINS_RUN_METADATA_API = '{job_url}{run_id}/api/python' JENKINS_RUN_BLUEOCEAN_API = '{jenkins_url}blue/rest/organizations/jenkins/{pipeline_paths}/runs/{run_id}/' JENKINS_JOB_METADATA_API = '{jenkins_url}{job_paths}/api/python' REGEX_URL_EXTRACT_JOB_NAME = re.compile(r'job\/([^\/]+)') class JenkinsJob(object): """ Object representing a Jenkins Job """ def __init__(self, jenkins_url, last_run_id, job_url, full_job_name, last_build_time): self.jenkins_url = jenkins_url self.last_run_id = last_run_id self.job_url = job_url self.full_job_name = full_job_name self.last_scanned_run_id = 0 self.last_build_time = dateutil.parser.parse(last_build_time) self.job_hierarchy = None # Will be retrieved later if required def __repr__(self): return f'{self.full_job_name} @ {self.job_url}' def update_last_scanned_run_id(self, last_scanned_run_id): """ Update the last scanned run id of this run. :param last_scanned_run_id: ID of the last scanned run :return: Nothing """ self.last_scanned_run_id = last_scanned_run_id def get_job_hierarchy(self): """ Query the jenkins API to get the real job hierarchy - e.g. which part of the job name is a folder, which one is the job name and which one is the branch name (if applicable). This is necessary because there are multiple methods to define Jenkins jobs. :return: Dictionary """ if self.job_hierarchy: # Cached result return self.job_hierarchy # By looking at the parent job, if applicable, we can see whether we are currently part of a multi-branch job. # If we are, we have to take the last part of the job name as branch name instead. job_groups = REGEX_URL_EXTRACT_JOB_NAME.findall(self.job_url) self.job_hierarchy = {} if len(job_groups) > 1: # This job has a parent. Inspect it. job_paths = '/'.join(['job/' + job for job in job_groups[:-1]]) url = JENKINS_JOB_METADATA_API.format(jenkins_url=self.jenkins_url, job_paths=job_paths) try: metadata = ast.literal_eval( requests.get( url=url, params={'tree': '_class,fullName'}, allow_redirects=False).text) except SyntaxError: raise Exception(f'Unable to retrieve meta data for parent job of {self} at {url}') if metadata['_class'] == 'org.jenkinsci.plugins.workflow.multibranch.WorkflowMultiBranchProject': logging.debug('%s is part of a MultiBranchProject', self) branch_name = job_groups[-1] # Last entry is the branch name else: logging.debug('%s is probably not part of a MultiBranchProject since the parent class is a %s. Thus,' 'considering it as independenct job.', self, metadata['_class']) branch_name = None job_name = metadata['fullName'] else: logging.debug('%s has no parent, considering it a standalone job', self) branch_name = None job_name = job_groups[0] self.job_hierarchy['job_name'] = job_name self.job_hierarchy['branch_name'] = branch_name return self.job_hierarchy def get_outstanding_jenkins_runs(self): """ Retrieve a list of Jenkins runs that have not been processed yet :return: Array of JenkinsRuns """ return [JenkinsRun(parent_job=self, run_id=run_id) for run_id in range(self.last_scanned_run_id + 1, self.last_run_id)] class JenkinsRun(object): """ Object representing a Jenkins Run """ def __init__(self, parent_job, run_id): self.parent_job = parent_job self.run_id = run_id def __repr__(self): return f'{self.parent_job.full_job_name} #{self.run_id}' def retrieve_metadata(self, tree_filter_string): """ Retrieve this runs' metadata. :param tree_filter_string: A string that limits which fields are being retrieved for performance reasons. This is a Jenkins Rest API feature. :return: Dictionary containing the requested meta data """ try: return ast.literal_eval( requests.get(url=JENKINS_RUN_METADATA_API.format(job_url=self.parent_job.job_url, run_id=self.run_id), params={'tree': tree_filter_string}, allow_redirects=True).text) except SyntaxError: # Jenkins prints a 404 as HTML with a 200 code... logging.debug('Run %s does not exist, skipping...', self) return None def _get_blue_ocean_api(self): """ Get blue ocean API endpoint for this run :return: URL """ job_groups = REGEX_URL_EXTRACT_JOB_NAME.findall(self.parent_job.job_url) pipeline_paths = '/'.join(['pipelines/' + job for job in job_groups]) return JENKINS_RUN_BLUEOCEAN_API.format(jenkins_url=self.parent_job.jenkins_url, pipeline_paths=pipeline_paths, run_id=self.run_id) def retrieve_nodes(self): """ Retrieve all Jenkins nodes associated with this run. :return: List JenkinsNode """ try: response = requests.get(url=self._get_blue_ocean_api() + 'nodes', allow_redirects=True).json() except json.decoder.JSONDecodeError: # Jenkins sometimes prints a 404 as HTML with a 200 code... return None if 'code' in response and response['code'] is not 200: logging.error('Error retrieving nodes for run %s: %s', self, response['message']) return None jenkins_nodes = list() for json_node_entry in response: if not json_node_entry['state']: logging.debug('Step %s of %s is empty, skipping', json_node_entry['displayName'], self) logging.debug(json_node_entry) continue jenkins_nodes.append(JenkinsNode(parent_run=self, json_node_entry=json_node_entry)) return jenkins_nodes class JenkinsNode(object): """ Object representing a Jenkins node that is part of a Jenkins run """ def __init__(self, parent_run, json_node_entry): self.parent_run = parent_run self.result = json_node_entry['result'] self.type = json_node_entry['type'] self.display_name = json_node_entry['displayName'] self.start_timestamp = dateutil.parser.parse(json_node_entry['startTime']).timestamp() self.duration_ms = json_node_entry['durationInMillis'] self._steps_api_link = json_node_entry['_links']['steps']['href'] def get_steps(self): """ Return the underlying steps that are being executed as part of this Jenkins node :return: """ try: response = requests.get(url=self.parent_run.parent_job.jenkins_url + self._steps_api_link, allow_redirects=True).json() except json.decoder.JSONDecodeError: # Jenkins sometimes prints a 404 as HTML with a 200 code... return None return [JenkinsStep(parent_step=self, json_step_entry=json_step_entry) for json_step_entry in response] class JenkinsStep(object): """ Object representing a Jenkins step that is part of a Jenkins node """ def __init__(self, parent_step, json_step_entry): self.parent_step = parent_step self.duration_ms = json_step_entry['durationInMillis'] def _retrieve_jenkins_jobs(jenkins_url): """ Query the Jenkins server and return all jenkins jobs and the last run id :return: Array of JenkinsJobs """ session = XMLSession() try: r = session.get(url=jenkins_url + JENKINS_ALL_RUNS_API) r.raise_for_status() except requests.exceptions.HTTPError as err: logging.error(err) # <Project activity="Sleeping" lastBuildStatus="Success" lastBuildLabel="756" # webUrl="http://jenkins.mxnet-ci.amazon-ml.com/job/Broken_Link_Checker_Pipeline/" # name="Broken_Link_Checker_Pipeline" lastBuildTime="2018-11-30T01:12:59Z"/> # # <Project activity="Sleeping" lastBuildStatus="Success" lastBuildLabel="1" # webUrl="http://jenkins.mxnet-ci.amazon-ml.com/job/incubator-mxnet/job/PR-10008/" # name="incubator-mxnet » PR-10008" lastBuildTime="2018-03-06T18:19:44Z"/> return [JenkinsJob(jenkins_url=jenkins_url, last_run_id=int(run.attrs['lastBuildLabel']), job_url=run.attrs['webUrl'], full_job_name=run.attrs['name'], last_build_time=run.attrs['lastBuildTime']) for run in r.xml.xpath('//Project')]