def fetch_status()

in analytics/circleci_analyze.py [0:0]


def fetch_status(branch=None, item_count=50):
    isatty = sys.stdout.isatty()
    padding = os.get_terminal_size().columns - 1 if isatty else None
    ci_cache = CircleCICache(token=get_circleci_token())
    print(f"About to fetch {item_count} latest pipelines against {branch if branch is not None else 'all branches'}")
    pipelines = ci_cache.get_pipelines(branch=branch, item_count=item_count)
    total_price, total_master_price = 0, 0
    for pipeline_idx, pipeline in enumerate(pipelines):
        revision = pipeline['vcs']['revision']
        branch = pipeline['vcs']['branch']
        workflows = ci_cache.get_pipeline_workflows(pipeline['id'])
        known_job_ids = []
        for workflow in workflows:
            url = f'https://app.circleci.com/pipelines/github/pytorch/pytorch/{workflow["pipeline_number"]}/workflows/{workflow["id"]}'
            if is_workflow_in_progress(workflow):
                print_line(f'Skipping {url} name:{workflow["name"]} status:{workflow["status"]}',
                           newline=not sys.stdout.isatty())
                continue
            rerun = False
            total_credits, test_credits, gpu_credits, wincpu_credits, wingpu_credits = 0, 0, 0, 0, 0
            jobs = ci_cache.get_workflow_jobs(workflow['id'])
            for job in jobs:
                job_name, job_status, job_number = job['name'], job['status'], job.get('job_number', None)
                if job_status in ['blocked', 'canceled', 'unauthorized', 'running', 'not_run', 'failing']:
                    continue
                if job_number is None:
                    print(job)
                    continue
                if job_number in known_job_ids:
                    rerun = True
                    continue
                job_info = ci_cache.get_job(job['project_slug'], job_number)
                if 'executor' not in job_info:
                    print(f'executor not found in {job_info}')
                    continue
                job_executor = job_info['executor']
                resource_class = job_executor['resource_class']
                if resource_class is None:
                    print(f'resource_class is none for {job_info}')
                    continue
                job_on_gpu = 'gpu' in resource_class
                job_on_win = 'windows' in resource_class
                if job_status != 'infrastructure_fail':
                    duration = str2date(job_info['stopped_at']) - str2date(job_info['started_at'])
                    job_credits = get_executor_price_rate(job_executor) * int(job_info['duration']) * 1e-3 / 60
                else:
                    job_credits, duration = 0, 0
                job_cost = job_credits * price_per_credit
                total_credits += job_credits
                if 'test' in job_name or job_name.startswith('smoke_'):
                    test_credits += job_credits
                elif job_on_gpu:
                    print(f'Running build job {job_name} on GPU!!!')
                if job_on_gpu:
                    gpu_credits += job_credits
                    if job_on_win:
                        wingpu_credits += job_credits
                if job_on_win and not job_on_gpu:
                    wincpu_credits += job_credits
                known_job_ids.append(job_number)
                print_line(f'         {job_name} {job_status}  {duration} ${job_cost:.2f}',
                           padding=padding, newline=not isatty)
            # Increment totals
            total_price += total_credits * price_per_credit
            if branch in ['master', 'nightly', 'postnightly', 'release/1.6']:
                total_master_price += total_credits * price_per_credit
            # skip small jobs
            if total_credits * price_per_credit < .1:
                continue
            workflow_status = f'[{pipeline_idx}/{len(pipelines)}]'
            workflow_status += f' {url} {workflow["name"]} status:{workflow["status"]}'
            workflow_status += f' price: ${total_credits * price_per_credit:.2f}'
            workflow_status += ' (Rerun?)' if rerun else ''
            workflow_status += f'\n\t\tdate: {workflow["created_at"]} branch:{branch} revision:{revision}'
            workflow_status += f'\n\t\ttotal credits: {int(total_credits)}'
            if test_credits != 0:
                workflow_status += f' testing: {100 * test_credits / total_credits:.1f}%'
            if gpu_credits != 0:
                workflow_status += f' GPU testing: {100 * gpu_credits / total_credits:.1f}%'
                if wingpu_credits != 0:
                    workflow_status += f' WINGPU/GPU: {100 * wingpu_credits / gpu_credits:.1f}%'

            if wincpu_credits != 0:
                workflow_status += f' Win CPU: {100 * wincpu_credits / total_credits:.1f}%'
            workflow_status += f' Total: ${total_price:.2f} master fraction: {100 * total_master_price/ total_price:.1f}%'
            print_line(workflow_status, padding=padding)