def generate_ci_report()

in services/daily-ci-reports/report.py [0:0]


def generate_ci_report(start, end):
    """
    Generate the email report for CI results
    :param start: Starttime
    :return: Nothing
    """
    test_results = []

    query = JenkinsQuery()

    all_pipelines = query.all_pipelines()

    for pipeline in all_pipelines:
        if isinstance(pipeline, MultiBranchPipeline):
            branches = pipeline.all_branches()
            branches = list(filter(lambda b: Pipeline.filter_branch_name(b['name']), branches))
            branches = list(filter(lambda b: b['name'] in ENABLED_BRANCHES, branches))
            branch_names = sorted(list(set(map(lambda b: b['name'], branches))))

            if not branch_names:
                logging.warning(f'no branches for {pipeline.name} found')
                continue

            for branch in branch_names:
                jobify = lambda x: '/job/'.join(x)
                names = pipeline.name.split('/')
                job_url = f'{JENKINS_URL}job/{jobify(names)}'
                branch_url = f'{job_url}/job/{branch}'

                test_result = TestResults(job=pipeline.name,
                                          job_url=job_url,
                                          branch=branch,
                                          branch_url=branch_url)
                runs = pipeline.filter_runs(pipeline.all_branch_runs(branch), start=start, end=end)
                for run in runs:
                    if run['result'] == 'FAILURE':
                        test_result.num_failed += 1
                    if run['result'] == 'SUCCESS':
                        test_result.num_passed += 1
                pprint(test_result)
                test_results.append(test_result)

            continue

        if isinstance(pipeline, Pipeline):
            test_result = TestResults(job=pipeline.name, job_url=f'{JENKINS_URL}job/{pipeline.name}')
            runs = pipeline.filter_runs(pipeline.all_runs(), start=start, end=end)
            for run in runs:
                if run['result'] == 'FAILURE':
                    test_result.num_failed += 1
                if run['result'] == 'SUCCESS':
                    test_result.num_passed += 1
            pprint(test_result)
            test_results.append(test_result)

            continue

        raise Exception('Unknown pipeline type!')

    test_results = explicit_filter_and_group(test_results)

    ci_report_html_output = ci_report_template.render(
        test_results=test_results,
        report_date=start)

    send_email(title=f'[Daily CI Report] {start.month:d}/{start.day:d}/{start.year:d}', sender=EMAIL_SENDER,
               recipient=EMAIL_RECEIVER, html_body=ci_report_html_output)