def can_fetch_intermediate_output()

in airavata_django_portal_sdk/experiment_util/intermediate_output.py [0:0]


def can_fetch_intermediate_output(request, experiment: ExperimentModel, output_name: str) -> bool:
    """Return True if intermediate output can be fetched for the given named output."""
    # look at job status and check if currently running intermediate output process
    jobs: List[JobModel] = []
    process: ProcessModel
    task: TaskModel
    for process in experiment.processes:
        for task in process.tasks:
            for job in task.jobs:
                jobs.append(job)

    def latest_status_is_active(job: JobModel) -> bool:
        if not job.jobStatuses or len(job.jobStatuses) == 0:
            return False
        return job.jobStatuses[-1].jobState == JobState.ACTIVE
    # If there are no active jobs, return False
    if not any(map(latest_status_is_active, jobs)):
        return False

    try:
        # Return True if process status is in a terminal state
        process_status = get_intermediate_output_process_status(request, experiment, output_name)
        return process_status.state in [ProcessState.CANCELED, ProcessState.COMPLETED, ProcessState.FAILED]
    except Exception:
        # Return True since error here likely means that there is no currently running process
        return True