in analytics/circleci_analyze.py [0:0]
def fetch_status(branch=None, item_count=50):
isatty = sys.stdout.isatty()
padding = os.get_terminal_size().columns - 1 if isatty else None
ci_cache = CircleCICache(token=get_circleci_token())
print(f"About to fetch {item_count} latest pipelines against {branch if branch is not None else 'all branches'}")
pipelines = ci_cache.get_pipelines(branch=branch, item_count=item_count)
total_price, total_master_price = 0, 0
for pipeline_idx, pipeline in enumerate(pipelines):
revision = pipeline['vcs']['revision']
branch = pipeline['vcs']['branch']
workflows = ci_cache.get_pipeline_workflows(pipeline['id'])
known_job_ids = []
for workflow in workflows:
url = f'https://app.circleci.com/pipelines/github/pytorch/pytorch/{workflow["pipeline_number"]}/workflows/{workflow["id"]}'
if is_workflow_in_progress(workflow):
print_line(f'Skipping {url} name:{workflow["name"]} status:{workflow["status"]}',
newline=not sys.stdout.isatty())
continue
rerun = False
total_credits, test_credits, gpu_credits, wincpu_credits, wingpu_credits = 0, 0, 0, 0, 0
jobs = ci_cache.get_workflow_jobs(workflow['id'])
for job in jobs:
job_name, job_status, job_number = job['name'], job['status'], job.get('job_number', None)
if job_status in ['blocked', 'canceled', 'unauthorized', 'running', 'not_run', 'failing']:
continue
if job_number is None:
print(job)
continue
if job_number in known_job_ids:
rerun = True
continue
job_info = ci_cache.get_job(job['project_slug'], job_number)
if 'executor' not in job_info:
print(f'executor not found in {job_info}')
continue
job_executor = job_info['executor']
resource_class = job_executor['resource_class']
if resource_class is None:
print(f'resource_class is none for {job_info}')
continue
job_on_gpu = 'gpu' in resource_class
job_on_win = 'windows' in resource_class
if job_status != 'infrastructure_fail':
duration = str2date(job_info['stopped_at']) - str2date(job_info['started_at'])
job_credits = get_executor_price_rate(job_executor) * int(job_info['duration']) * 1e-3 / 60
else:
job_credits, duration = 0, 0
job_cost = job_credits * price_per_credit
total_credits += job_credits
if 'test' in job_name or job_name.startswith('smoke_'):
test_credits += job_credits
elif job_on_gpu:
print(f'Running build job {job_name} on GPU!!!')
if job_on_gpu:
gpu_credits += job_credits
if job_on_win:
wingpu_credits += job_credits
if job_on_win and not job_on_gpu:
wincpu_credits += job_credits
known_job_ids.append(job_number)
print_line(f' {job_name} {job_status} {duration} ${job_cost:.2f}',
padding=padding, newline=not isatty)
# Increment totals
total_price += total_credits * price_per_credit
if branch in ['master', 'nightly', 'postnightly', 'release/1.6']:
total_master_price += total_credits * price_per_credit
# skip small jobs
if total_credits * price_per_credit < .1:
continue
workflow_status = f'[{pipeline_idx}/{len(pipelines)}]'
workflow_status += f' {url} {workflow["name"]} status:{workflow["status"]}'
workflow_status += f' price: ${total_credits * price_per_credit:.2f}'
workflow_status += ' (Rerun?)' if rerun else ''
workflow_status += f'\n\t\tdate: {workflow["created_at"]} branch:{branch} revision:{revision}'
workflow_status += f'\n\t\ttotal credits: {int(total_credits)}'
if test_credits != 0:
workflow_status += f' testing: {100 * test_credits / total_credits:.1f}%'
if gpu_credits != 0:
workflow_status += f' GPU testing: {100 * gpu_credits / total_credits:.1f}%'
if wingpu_credits != 0:
workflow_status += f' WINGPU/GPU: {100 * wingpu_credits / gpu_credits:.1f}%'
if wincpu_credits != 0:
workflow_status += f' Win CPU: {100 * wincpu_credits / total_credits:.1f}%'
workflow_status += f' Total: ${total_price:.2f} master fraction: {100 * total_master_price/ total_price:.1f}%'
print_line(workflow_status, padding=padding)