in analytics/circleci_analyze.py [0:0]
def compute_covariance(branch='master', name_filter: Optional[Callable[[str], bool]] = None):
import numpy as np
revisions: MutableSet[str] = set()
job_summary: Dict[str, Dict[str, float]] = {}
# Extract data
print(f"Computing covariance for {branch if branch is not None else 'all branches'}")
ci_cache = CircleCICache(None)
pipelines = ci_cache.get_pipelines(branch=branch)
for pipeline in pipelines:
if pipeline['trigger']['type'] == 'schedule':
continue
revision = pipeline['vcs']['revision']
pipeline_jobs: Dict[str, float] = {}
blocked_jobs: MutableSet[str] = set()
workflows = ci_cache.get_pipeline_workflows(pipeline['id'])
for workflow in workflows:
if is_workflow_in_progress(workflow):
continue
jobs = ci_cache.get_workflow_jobs(workflow['id'])
for job in jobs:
job_name = job['name']
job_status = job['status']
# Handle renames
if job_name == 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_NO_AVX2_test':
job_name = 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_nogpu_NO_AVX2_test'
if job_name == 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_NO_AVX_NO_AVX2_test':
job_name = 'pytorch_linux_xenial_cuda10_1_cudnn7_py3_nogpu_NO_AVX_test'
if job_status in ['infrastructure_fail', 'canceled']:
continue
if callable(name_filter) and not name_filter(job_name):
continue
if job_status == 'blocked':
blocked_jobs.add(job_name)
continue
if job_name in blocked_jobs:
blocked_jobs.remove(job_name)
result = 1.0 if job_status == 'success' else -1.0
pipeline_jobs[job_name] = result
# Skip build with blocked job [which usually means build failed due to the test failure]
if len(blocked_jobs) != 0:
continue
# Skip all success workflows
if all(result == 1.0 for result in pipeline_jobs.values()):
continue
revisions.add(revision)
for job_name in pipeline_jobs:
if job_name not in job_summary:
job_summary[job_name] = {}
job_summary[job_name][revision] = pipeline_jobs[job_name]
# Analyze results
job_names = sorted(job_summary.keys())
# revisions = sorted(revisions)
job_data = np.zeros((len(job_names), len(revisions)), dtype=np.float)
print(f"Number of observations: {len(revisions)}")
for job_idx, job_name in enumerate(job_names):
job_row = job_summary[job_name]
for rev_idx, revision in enumerate(revisions):
if revision in job_row:
job_data[job_idx, rev_idx] = job_row[revision]
success_rate = job_data[job_idx, ].sum(where=job_data[job_idx, ] > 0.0) / len(job_row)
present_rate = 1.0 * len(job_row) / len(revisions)
print(f"{job_name}: missing {100.0 * (1.0 - present_rate):.2f}% success rate: {100 * success_rate:.2f}%")
cov_matrix = np.corrcoef(job_data)
plot_heatmap(cov_matrix, job_names)