in api-server/tesazure/backends/batch/__init__.py [0:0]
def createJob(self, task):
"""Create new job and execute workflow."""
# check Azure Batch jobs list to see if a job with the same id exists
job_id = str(uuid.uuid4())
existing_job_ids = [j.id for j in self.batch_client.job.list()]
if job_id in existing_job_ids:
current_app.logger.warning(f"Batch job {job_id} already exists!")
return False
current_app.logger.info(f'Creating Azure Batch job {job_id}')
# TODO: Make a script in the container and pass inputs instead of command line
download_commands = backend_common.commands.generate_input_download_commands(task)
upload_commands = backend_common.commands.generate_output_upload_commands(task)
# Upload TES inputs passed as 'content' to blob, then download them into
# the containers during prep. FIXME: Need a cleanup routine for this
container_name = current_app.config['BATCH_STORAGE_TMP_CONTAINER_NAME']
blob_service = azblob.BlockBlobService(account_name=current_app.config['STORAGE_ACCOUNT_NAME'], account_key=current_app.config['STORAGE_ACCOUNT_KEY'])
blob_service.create_container(container_name)
for input in task.inputs:
if input.content is None:
continue
blob_filename = str(uuid.uuid4())
blob_service.create_blob_from_text(container_name, blob_filename, input.content)
token = blob_service.generate_blob_shared_access_signature(
container_name,
blob_filename,
azblob.BlobPermissions.READ,
datetime.datetime.utcnow() + datetime.timedelta(hours=1),
)
url = blob_service.make_blob_url(container_name, blob_filename, sas_token=token)
download_commands.append(backend_common.commands.generate_input_download_command(url, input.path))
# Pick an appropriate VM size and create the pool as necessary
vm_size = backend_common.determine_azure_vm_for_task(task.resources)
override_pool_id = current_app.config.get('BATCH_OVERRIDE_POOL_ID', None)
task_override_pool_id = task.tags.get('ms-backend-batch-pool-id', None)
if task_override_pool_id:
current_app.logger.info(f"Using pool override '{override_pool_id}' from task tag for batch job '{job_id}")
pool_info = azbatch.models.PoolInformation(pool_id=task_override_pool_id)
elif override_pool_id:
current_app.logger.info(f"Using pool override '{override_pool_id}' from app config for batch job '{job_id}'")
pool_info = azbatch.models.PoolInformation(pool_id=override_pool_id)
else:
current_app.logger.info(f"Auto-pool to be created with batch job {job_id}")
# Tasks timeout if pulls take >60s, so pull the images at pool creation time
executor_image_names = list(set(executor.image for executor in task.executors))
pool_info = azbatch.models.PoolInformation(
auto_pool_specification=azbatch.models.AutoPoolSpecification(
auto_pool_id_prefix='tes',
pool_lifetime_option='job',
keep_alive=current_app.config.get('BATCH_AUTOPOOL_KEEPALIVE', False),
pool=self._initializePoolSpec(vm_size, executor_image_names=executor_image_names)
)
)
job = azbatch.models.JobAddParameter(
id=job_id,
pool_info=pool_info,
on_all_tasks_complete=azbatch.models.OnAllTasksComplete.terminate_job,
on_task_failure=azbatch.models.OnTaskFailure.perform_exit_options_job_action
)
current_app.logger.info(f'Adding preparation task for job {job_id}')
task_container_run_options = '-v "$AZ_BATCH_TASK_DIR/../:/tes-wd"'
if 'BATCH_STORAGE_FILESHARE_NAME' in current_app.config and current_app.config['BATCH_STORAGE_FILESHARE_NAME'] is not None:
azfiles_path = f"/mnt/batch/tasks/shared-azfiles"
task_container_run_options += f' -v "{azfiles_path}:/tes-wd/shared-global"'
fileprep_task_container_run_options = '--entrypoint=/bin/sh ' + task_container_run_options
download_commands_shell = ';'.join(download_commands) if download_commands else 'true'
job.job_preparation_task = azbatch.models.JobPreparationTask(
container_settings=azbatch.models.TaskContainerSettings(
image_name=current_app.config.get('FILETRANSFER_CONTAINER_IMAGE'),
container_run_options=fileprep_task_container_run_options
),
# mkdir is required to ensure permissions are right on folder
# if created by Docker from -v syntax, owned by root and permissions
# errors ensue
command_line=f""" -c "set -e; set -o pipefail; mkdir -p /tes-wd/shared; {download_commands_shell}; wait" """,
)
if upload_commands:
# prep task is always used for mkdir above at a minimum, but if
# changing this logic later, recall that release task cannot be
# specified without prep task
current_app.logger.info(f'Adding release task for job {job_id}')
job.job_release_task = azbatch.models.JobReleaseTask(
container_settings=azbatch.models.TaskContainerSettings(
image_name=current_app.config.get('FILETRANSFER_CONTAINER_IMAGE'),
container_run_options=fileprep_task_container_run_options
),
command_line=f""" -c "set -e; set -o pipefail; {';'.join(upload_commands)}; wait" """
)
with _handle_batch_error():
self.batch_client.job.add(job)
for executor in task.executors:
task_id = str(uuid.uuid4())
current_app.logger.info(f'Creating Azure Batch task {task_id} for executor')
batch_task_env = [azbatch.models.EnvironmentSetting(name=k, value=v) for k, v in executor.env.items()]
commands = [' '.join(executor.command)]
if executor.stdout:
# currently the source path isn't quoted; we should map the workdir into /tes-wd explicitly
# to avoid having to use the shell expansion here
commands += backend_common.commands.generate_copy_commands(posixpath.join("/tes-wd/$AZ_BATCH_TASK_ID/stdout.txt"), executor.stderr)
if executor.stderr:
commands += backend_common.commands.generate_copy_commands(posixpath.join("/tes-wd/$AZ_BATCH_TASK_ID/stderr.txt"), executor.stderr)
# TODO: Handle 'workdir' parameter from TES
task = azbatch.models.TaskAddParameter(
id=task_id,
environment_settings=batch_task_env,
command_line=f"""/bin/sh -c "set -e; {';'.join(commands)}; wait" """,
container_settings=azbatch.models.TaskContainerSettings(
image_name=executor.image,
container_run_options=task_container_run_options
)
)
self.batch_client.task.add(job_id=job_id, task=task)
return job_id