in airavata_django_portal_sdk/experiment_util/intermediate_output.py [0:0]
def can_fetch_intermediate_output(request, experiment: ExperimentModel, output_name: str) -> bool:
"""Return True if intermediate output can be fetched for the given named output."""
# look at job status and check if currently running intermediate output process
jobs: List[JobModel] = []
process: ProcessModel
task: TaskModel
for process in experiment.processes:
for task in process.tasks:
for job in task.jobs:
jobs.append(job)
def latest_status_is_active(job: JobModel) -> bool:
if not job.jobStatuses or len(job.jobStatuses) == 0:
return False
return job.jobStatuses[-1].jobState == JobState.ACTIVE
# If there are no active jobs, return False
if not any(map(latest_status_is_active, jobs)):
return False
try:
# Return True if process status is in a terminal state
process_status = get_intermediate_output_process_status(request, experiment, output_name)
return process_status.state in [ProcessState.CANCELED, ProcessState.COMPLETED, ProcessState.FAILED]
except Exception:
# Return True since error here likely means that there is no currently running process
return True