in src/backend/main.py [0:0]
def extract_prediction_relaxation_tasks(_pipe, _client_pipeline):
get_request = vertex_ai2.GetPipelineJobRequest(name=_pipe.name) # pipe.name = pipeline ID
pipeline_job = _client_pipeline.get_pipeline_job(get_request)
# predict_tasks = [i for i in pipeline_job.job_detail.task_details]
predict_tasks = [i for i in pipeline_job.job_detail.task_details if i.task_name == 'predict']
potential_relax_tasks = [i for i in pipeline_job.job_detail.task_details if 'condition' in i.task_name]
formatted_predict_relax_tasks = []
for predictTask in predict_tasks:
task_id = predictTask.task_id
parent_task_id = predictTask.parent_task_id
print(f'Predict parent_task_id: {parent_task_id}')
model_name = predictTask.execution.metadata['input:model_name']
ranking_confidence = 0 if predictTask.outputs["raw_prediction"].artifacts[0].metadata.get("ranking_confidence") == None \
else predictTask.outputs['raw_prediction'].artifacts[0].metadata['ranking_confidence']
predict_uri = predictTask.outputs['raw_prediction'].artifacts[0].uri
# Get relaxation
relax_task_id = None
relax_uri = None
for relaxTask in potential_relax_tasks:
if relaxTask.parent_task_id == parent_task_id:
relax_task_id = relaxTask.task_id
break
for t2 in pipeline_job.job_detail.task_details:
if t2.parent_task_id == relax_task_id:
relax_uri = t2.outputs['relaxed_protein'].artifacts[0].uri \
if 'relaxed_protein' in t2.outputs else None
print(f'Relax uri: {relax_uri}')
break
formatted_predict_relax_tasks.append(
{
'task_id': task_id,
'task_name': predictTask.task_name,
'parent_task_id': parent_task_id,
'model_name': model_name,
'ranking_confidence': ranking_confidence,
'predict_uri': reformatBucketUri(predict_uri),
'relax_uri': reformatBucketUri(relax_uri)
}
)
return formatted_predict_relax_tasks