def compute_covariance()

in analytics/circleci_analyze.py [0:0]


def compute_covariance(branch='master', name_filter: Optional[Callable[[str], bool]] = None):
    import numpy as np
    revisions: MutableSet[str] = set()
    job_summary: Dict[str, Dict[str, float]] = {}

    # Extract data
    print(f"Computing covariance for {branch if branch is not None else 'all branches'}")
    ci_cache = CircleCICache(None)
    pipelines = ci_cache.get_pipelines(branch=branch)
    for pipeline in pipelines:
        if pipeline['trigger']['type'] == 'schedule':
            continue
        revision = pipeline['vcs']['revision']
        pipeline_jobs: Dict[str, float] = {}
        blocked_jobs: MutableSet[str] = set()
        workflows = ci_cache.get_pipeline_workflows(pipeline['id'])
        for workflow in workflows:
            if is_workflow_in_progress(workflow):
                continue
            jobs = ci_cache.get_workflow_jobs(workflow['id'])
            for job in jobs:
                job_name = job['name']
                job_status = job['status']
                # Handle renames
                if job_name == 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_NO_AVX2_test':
                    job_name = 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_nogpu_NO_AVX2_test'
                if job_name == 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_NO_AVX_NO_AVX2_test':
                    job_name = 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_nogpu_NO_AVX_test'
                if job_status in ['infrastructure_fail', 'canceled']:
                    continue
                if callable(name_filter) and not name_filter(job_name):
                    continue
                if job_status == 'blocked':
                    blocked_jobs.add(job_name)
                    continue
                if job_name in blocked_jobs:
                    blocked_jobs.remove(job_name)
                result = 1.0 if job_status == 'success' else -1.0
                pipeline_jobs[job_name] = result
        # Skip build with blocked job [which usually means build failed due to the test failure]
        if len(blocked_jobs) != 0:
            continue
        # Skip all success workflows
        if all(result == 1.0 for result in pipeline_jobs.values()):
            continue
        revisions.add(revision)
        for job_name in pipeline_jobs:
            if job_name not in job_summary:
                job_summary[job_name] = {}
            job_summary[job_name][revision] = pipeline_jobs[job_name]
    # Analyze results
    job_names = sorted(job_summary.keys())
    # revisions = sorted(revisions)
    job_data = np.zeros((len(job_names), len(revisions)), dtype=np.float)
    print(f"Number of observations: {len(revisions)}")
    for job_idx, job_name in enumerate(job_names):
        job_row = job_summary[job_name]
        for rev_idx, revision in enumerate(revisions):
            if revision in job_row:
                job_data[job_idx, rev_idx] = job_row[revision]
        success_rate = job_data[job_idx, ].sum(where=job_data[job_idx, ] > 0.0) / len(job_row)
        present_rate = 1.0 * len(job_row) / len(revisions)
        print(f"{job_name}: missing {100.0 * (1.0 - present_rate):.2f}% success rate: {100 * success_rate:.2f}%")
    cov_matrix = np.corrcoef(job_data)
    plot_heatmap(cov_matrix, job_names)