in api-server/tesazure/backends/batch/__init__.py [0:0]
def get_task(self, task_id):
try:
batch_job = self.batch_client.job.get(task_id)
except azbatch.models.BatchErrorException as e:
if e.error.code == "JobNotFound":
return False
raise
tes_task = tesmodels.TesTask()
tes_task.creation_time = batch_job.creation_time
tes_log = tesmodels.TaskLog()
tes_log.start_time = batch_job.creation_time
if batch_job.state == azbatch.models.JobState.completed:
tes_log.end_time = batch_job.state_transition_time
# Default state inheritance, with 'active' as QUEUED unless we get more detailed into from tasks
state_map = {
azbatch.models.JobState.active: tesmodels.TaskStatus.QUEUED,
azbatch.models.JobState.completed: tesmodels.TaskStatus.COMPLETE,
azbatch.models.JobState.deleting: tesmodels.TaskStatus.CANCELED,
azbatch.models.JobState.disabled: tesmodels.TaskStatus.PAUSED,
azbatch.models.JobState.disabling: tesmodels.TaskStatus.PAUSED,
azbatch.models.JobState.enabling: tesmodels.TaskStatus.PAUSED,
azbatch.models.JobState.terminating: tesmodels.TaskStatus.RUNNING,
}
tes_task.state = state_map.get(batch_job.state, tesmodels.TaskStatus.UNKNOWN)
# Check prep and release task status, but only if they exist
if any([batch_job.job_preparation_task, batch_job.job_release_task]):
for task_info in self.batch_client.job.list_preparation_and_release_task_status(batch_job.id):
if task_info.job_preparation_task_execution_info:
# System error if file marshalling failed
if task_info.job_preparation_task_execution_info.result == azbatch.models.TaskExecutionResult.failure:
tes_task.state = tesmodels.TaskStatus.SYSTEM_ERROR
# Initializing when files are being downloaded
if task_info.job_preparation_task_execution_info.state == azbatch.models.JobPreparationTaskState.running:
tes_task.state = tesmodels.TaskStatus.INITIALIZING
if task_info.job_release_task_execution_info:
# System error if file marshalling failed
if task_info.job_release_task_execution_info.result == azbatch.models.TaskExecutionResult.failure:
tes_task.state = tesmodels.TaskStatus.SYSTEM_ERROR
# Active if files are being uploaded
if task_info.job_release_task_execution_info.state == azbatch.models.JobReleaseTaskState.running:
tes_task.state = tesmodels.TaskStatus.RUNNING
# Check status against batch task summary
if batch_job.execution_info.terminate_reason == azbatch.models.TaskExecutionResult.failure:
tes_task.state = tesmodels.TaskStatus.EXECUTOR_ERROR
if tes_task.state == tesmodels.TaskStatus.UNKNOWN:
task_status = self.batch_client.job.get_task_counts(batch_job.id)
if task_status.failed:
tes_task.state = tesmodels.TaskStatus.EXECUTOR_ERROR
elif task_status.running == 0 and task_status.active > 0:
tes_task.state = tesmodels.TaskStatus.QUEUED
# Get the executor logs (each batch task)
for batch_task in self.batch_client.task.list(batch_job.id):
if batch_task.execution_info.result == azbatch.models.TaskExecutionResult.failure:
# Above assumes EXECUTOR_ERROR on any task failure. Since we are responsible for orchestrating Batch,
# Batch USER_ERROR should be a TES-user-facing SYSTEM_ERROR regardless of if batch_task.execution_info.failure_info.category is
# azbatch.models.ErrorCategory.server_error or azbatch.models.ErrorCategory.user_error.
tes_task.state = tesmodels.TaskStatus.SYSTEM_ERROR
# Executor logs
executor_log = tesmodels.ExecutorLog(start_time=batch_task.creation_time, end_time=batch_task.state_transition_time)
executor_log.exit_code = batch_task.execution_info.exit_code
# Add output streams to executor logs (only available if job is completed)
if batch_job.state == azbatch.models.JobState.completed:
with _handle_batch_error(should_raise=False):
output = self.batch_client.file.get_from_task(batch_job.id, batch_task.id, 'stdout.txt')
for data in output:
executor_log.stdout += data.decode('utf-8')
with _handle_batch_error(should_raise=False):
output = self.batch_client.file.get_from_task(batch_job.id, batch_task.id, 'stderr.txt')
for data in output:
executor_log.stderr += data.decode('utf-8')
tes_log.logs = tes_log.logs + [executor_log]
# FIXME: tes_task is not loaded from db so outputs is empty here
for output in tes_task.outputs:
output_log = tesmodels.OutputFileLog()
output_log.path = output.path
output_log.url = output.url
output_log.size_bytes = 0 # TODO
tes_log.outputs = tes_log.outputs + [output_log]
tes_task.logs = [tes_log]
return tes_task