in api-server/tesazure/tesapi/api.py [0:0]
def get(self, task_id):
task = TesTask.get_by_id(task_id)
if not task:
msg = f"TES task '{task_id}' was not found"
current_app.logger.error(msg)
return {'errors': msg}, 404
if current_app.config['AAD_VERIFY'] is True:
if current_app.config['TASK_ACCESS_RESTRICTIONS'] is not None and task.tenant_id not in [None, jwt_claims['tid']]:
current_app.logger.error("Authorization required to view task")
return {'errors': 'Authorization required to view task'}, 403
if current_app.config['TASK_ACCESS_RESTRICTIONS'] == 'per-user' and task.user_id not in [None, jwt_claims['sub']]:
current_app.logger.error("Authorization required to view task")
return {'errors': 'Authorization required to view task'}, 403
else:
if current_app.config['TASK_ACCESS_RESTRICTIONS'] is not None and any(None is not x for x in [task.tenant_id, task.user_id]):
current_app.logger.error("Authorization required to view task")
return {'errors': 'Authorization required to view task'}, 403
schema = TesTaskSchema()
# merge backend status to db
backend_task = compute_backend.backend.get_task(task.backend_id)
if not backend_task:
current_app.logger.error(f"Task status failed: TES task '{task_id}' exists in local cache, but compute backend could not locate backend_id={task.backend_id}")
return {'errors': f"TES task '{task_id}' exists in local cache, but compute backend could not locate it"}, 500
for field_name in schema._declared_fields:
field_value = getattr(backend_task, field_name)
if field_value:
setattr(task, field_name, field_value)
task.save()
return schema.dump(task)