services/jenkins-run-statistics/statistics.py (183 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. """This script records statistics about Jenkins runs""" from pprint import pprint __author__ = 'Marco de Abreu' __version__ = '0.1' import json import logging import os import ast import re import ssl import sys import time import urllib.request from typing import Optional from datetime import datetime, timedelta, timezone from typing import Dict, List import dateutil import boto3 from botocore.exceptions import ClientError import botocore import dateutil import dateutil.parser import dateutil.tz import requests from requests_xml import XMLSession import jenkins_utils import aws_utils REGION_NAME = os.environ['REGION'] JENKINS_URL = os.environ['JENKINS_URL'] DYNAMODB_TABLE = os.environ['DYNAMODB_TABLE_NAME'] CLOUDWATCH_METRIC_NAMESPACE = os.environ['CLOUDWATCH_METRIC_NAMESPACE'] MAXIMUM_LOOKBACK_TIMEFRAME_SECONDS = 60 * 60 * 24 * 7 * 2 DYNAMO_KEY_FULL_JOB_NAME = 'FULL_JOB_NAME' DYNAMO_VALUE_LAST_SCANNED_RUN_ID = 'LAST_SCANNED_RUN_ID' def record_jenkins_run_durations(dynamo_db, cloudwatch): """ Main handler to initiate the process of recording Jenkins run durations :param dynamo_db: Handle to Boto DynamoDB :param cloudwatch:Handle to Boto CloudWatch :return: Nothing """ # Basically get a list of all Jenkins jobs and their last run id. Then compare the last run id with our database # in order to determine jobs that we haven't scanned entirely or got new data in the meantime. # Then, retrieve all the new runs, record the metrics and update the database. jenkins_jobs = jenkins_utils._retrieve_jenkins_jobs(jenkins_url=JENKINS_URL) _process_jenkins_jobs(dynamo_db=dynamo_db, cloudwatch=cloudwatch, jenkins_jobs=jenkins_jobs) def _process_jenkins_jobs(dynamo_db, cloudwatch, jenkins_jobs): """ Process the passed Jenkins Jobs and record metrics of the underlying runs :param dynamo_db: Handle to Boto DynamoDB :param cloudwatch: Handle to Boto CloudWatch :param jenkins_jobs: List of Jenkins Jobs :return: Nothing """ def generate_metric_dimensions(): job_name = jenkins_job.get_job_hierarchy()['job_name'] branch_name = jenkins_job.get_job_hierarchy()['branch_name'] if branch_name and 'PR-' in branch_name: # Replace pull request branch names with a generalized name. We don't want to track PR branches individually branch_name = 'Pull Request' metric_dimensions = {'Job': job_name} if branch_name: metric_dimensions['Branch'] = branch_name return metric_dimensions table = dynamo_db.Table(DYNAMODB_TABLE) for jenkins_job in jenkins_jobs: time_diff = datetime.now(tz=timezone.utc) - jenkins_job.last_build_time if time_diff.total_seconds() >= MAXIMUM_LOOKBACK_TIMEFRAME_SECONDS: logging.debug('%s has last been run %d days ago, skipping since its more than two weeks', jenkins_job, time_diff.days) continue last_processed_run_id = _dynamo_get_last_processed_jenkins_run_id(dynamo_table=table, jenkins_job_name=jenkins_job.full_job_name) if last_processed_run_id: jenkins_job.update_last_scanned_run_id(last_processed_run_id) outstanding_jenkins_runs = jenkins_job.get_outstanding_jenkins_runs() if not outstanding_jenkins_runs: logging.debug('%s has no outstanding runs', jenkins_job) continue metric_dimensions = generate_metric_dimensions() for jenkins_run in outstanding_jenkins_runs: if _process_jenkins_run(cloudwatch=cloudwatch, jenkins_run=jenkins_run, metric_dimensions=dict(metric_dimensions)): logging.info('%s has been processed, saving state in database', jenkins_run) _dynamo_set_last_processed_jenkins_run_id(dynamo_db=dynamo_db, jenkins_run=jenkins_run) else: logging.info('%s requested to not be processed further, aborting scan of job', jenkins_run) break def _process_jenkins_run(cloudwatch, jenkins_run, metric_dimensions): """ Process a single Jenkins run and record metrics accordingly :param jenkins_run: :return: True if we should continue or False if job should no longer be crawled, e.g. due to running jobs """ def process_stage(jenkins_node): """ Process the Jenkins node that is being considered a stage :param jenkins_node: Jenkins node :return: New stage name """ # The nodes are always in the correct order, so we can use that fact to preserve the # information about the stage we are currently in during parallel steps. current_stage = jenkins_node.display_name stage_metric_dimensions = dict(node_metric_dimensions) stage_metric_dimensions['Stage'] = current_stage aws_utils.publish_cloudwatch_metric( cloudwatch=cloudwatch, metric_name='Stage Duration', metric_namespace=CLOUDWATCH_METRIC_NAMESPACE, value=jenkins_node.duration_ms / 1000, unix_timestamp=unix_timestamp, dimensions=stage_metric_dimensions, unit='Seconds') logging.info('= STAGE %s took %s', current_stage, str(timedelta(milliseconds=jenkins_node.duration_ms))) return current_stage def process_parallel(jenkins_node): """ Process the Jenkins node that is being considered a parallel node :param jenkins_node: :return: """ # Determine duration of each parallel-entry by making the sum of all steps. This is # necessary because durationInMillis contains garbage for these nodes. Thanks, Jenkins! steps = jenkins_node.get_steps() if not steps: logging.error('No steps available') return parallel_duration_ms = 0 for step in steps: parallel_duration_ms += step.duration_ms step_metric_dimensions = dict(node_metric_dimensions) step_metric_dimensions['Stage'] = current_stage step_metric_dimensions['Step'] = jenkins_node.display_name aws_utils.publish_cloudwatch_metric( cloudwatch=cloudwatch, metric_name='Step Duration', unit='Seconds', value=int(parallel_duration_ms / 1000), unix_timestamp=unix_timestamp, metric_namespace=CLOUDWATCH_METRIC_NAMESPACE, dimensions=step_metric_dimensions) logging.info('== STEP %s ran for %s', jenkins_node.display_name, str(timedelta(milliseconds=parallel_duration_ms))) metadata = jenkins_run.retrieve_metadata(tree_filter_string='duration,building,timestamp,result') if metadata and metadata['building']: logging.info('%s is still running, skipping...', jenkins_run) return False # Make sure to not return eagerly because the DynamoDB entry creation has to happen to mark the run as processed if not metadata: logging.debug('Run %s does not exist, skipping...', jenkins_run) else: total_duration_ms = metadata['duration'] unix_timestamp = metadata['timestamp'] / 1000 time_diff = time.time() - unix_timestamp if time_diff >= MAXIMUM_LOOKBACK_TIMEFRAME_SECONDS: logging.info('Run %s is from %d days ago, skipping since its more than two weeks', jenkins_run, int(time_diff/60/60/24)) else: run_metric_dimensions = dict(metric_dimensions) run_metric_dimensions['Result'] = metadata['result'] aws_utils.publish_cloudwatch_metric(cloudwatch=cloudwatch, metric_namespace=CLOUDWATCH_METRIC_NAMESPACE, metric_name='Total Run Duration', unix_timestamp=unix_timestamp, dimensions=run_metric_dimensions, unit='Seconds', value=total_duration_ms/1000) logging.info('Run %s has been running for %s', jenkins_run, str(timedelta(milliseconds=total_duration_ms))) nodes = jenkins_run.retrieve_nodes() if not nodes: logging.debug('Run %s has no child stages', jenkins_run) else: current_stage = 'Unknown stage' for jenkins_node in nodes: node_metric_dimensions = dict(metric_dimensions) if jenkins_node.result: # This is none if the stage has not been reached # Make sure to differentiate metrics by whether the step was successful or not. Otherwise, # time measurements would be off since some jobs did not run until the end. node_metric_dimensions['Result'] = jenkins_node.result unix_timestamp = jenkins_node.start_timestamp if jenkins_node.type == 'STAGE': current_stage = process_stage(jenkins_node) elif jenkins_node.type == 'PARALLEL': process_parallel(jenkins_node) else: logging.error('Unknown stage: %s for %s', jenkins_node.type, jenkins_node) return True def _dynamo_set_last_processed_jenkins_run_id(dynamo_db, jenkins_run): """ Mark the passed Jenkins run as processed in the database. This allows to avoid duplicate processing in future. It's important that runs are processed from oldest to latest (and not in parallel) since we expect to only increase the 'last scanned run id'. :param dyanmo_db: Boto DynamoDB handle :param jenkins_run: Jenkins run :return: Nothing """ table = dynamo_db.Table(DYNAMODB_TABLE) table.update_item( Key={ DYNAMO_KEY_FULL_JOB_NAME: jenkins_run.parent_job.full_job_name }, UpdateExpression=f"set {DYNAMO_VALUE_LAST_SCANNED_RUN_ID} = :id", ExpressionAttributeValues={ ':id': jenkins_run.run_id } ) def _dynamo_get_last_processed_jenkins_run_id(dynamo_table, jenkins_job_name): response = dynamo_table.get_item( Key={ DYNAMO_KEY_FULL_JOB_NAME: jenkins_job_name } ) if 'Item' in response and DYNAMO_VALUE_LAST_SCANNED_RUN_ID in response['Item']: return int(response['Item'][DYNAMO_VALUE_LAST_SCANNED_RUN_ID]) else: logging.debug('%s has not been recorded yet', jenkins_job_name) return None def _configure_logging(): logging.basicConfig() logging.getLogger().setLevel(logging.INFO) logging.getLogger('botocore').setLevel(logging.INFO) logging.getLogger('boto3').setLevel(logging.INFO) logging.getLogger('urllib3').setLevel(logging.INFO) logging.getLogger('requests').setLevel(logging.ERROR) logging.getLogger('botocore.vendored.requests.packages.urllib3.connectionpool').setLevel(logging.ERROR) def main(): _configure_logging() aws_service_objects = aws_utils.generate_aws_service_objects(region_name=REGION_NAME) logging.info('Starting gathering statistics') record_jenkins_run_durations(dynamo_db=aws_service_objects.dynamo_db, cloudwatch=aws_service_objects.cloudwatch) logging.info('Statistics recorded') def lambda_handler(event, context): try: main() return 'Done' except Exception: # pylint: disable=broad-except logging.exception('Unexpected exception') logging.fatal('Unexpected exception') return 'Error' # This try-catch is important because we have to catch all exceptions. Otherwise, the exceptions bubble up to # lambda and the service retries executing multiple times. We only want exactly one execution per request. if __name__ == '__main__': sys.exit(main())