def createJob()

in api-server/tesazure/backends/batch/__init__.py [0:0]


    def createJob(self, task):
        """Create new job and execute workflow."""
        # check Azure Batch jobs list to see if a job with the same id exists
        job_id = str(uuid.uuid4())
        existing_job_ids = [j.id for j in self.batch_client.job.list()]
        if job_id in existing_job_ids:
            current_app.logger.warning(f"Batch job {job_id} already exists!")
            return False

        current_app.logger.info(f'Creating Azure Batch job {job_id}')

        # TODO: Make a script in the container and pass inputs instead of command line
        download_commands = backend_common.commands.generate_input_download_commands(task)
        upload_commands = backend_common.commands.generate_output_upload_commands(task)

        # Upload TES inputs passed as 'content' to blob, then download them into
        # the containers during prep. FIXME: Need a cleanup routine for this
        container_name = current_app.config['BATCH_STORAGE_TMP_CONTAINER_NAME']
        blob_service = azblob.BlockBlobService(account_name=current_app.config['STORAGE_ACCOUNT_NAME'], account_key=current_app.config['STORAGE_ACCOUNT_KEY'])
        blob_service.create_container(container_name)
        for input in task.inputs:
            if input.content is None:
                continue

            blob_filename = str(uuid.uuid4())
            blob_service.create_blob_from_text(container_name, blob_filename, input.content)
            token = blob_service.generate_blob_shared_access_signature(
                container_name,
                blob_filename,
                azblob.BlobPermissions.READ,
                datetime.datetime.utcnow() + datetime.timedelta(hours=1),
            )
            url = blob_service.make_blob_url(container_name, blob_filename, sas_token=token)
            download_commands.append(backend_common.commands.generate_input_download_command(url, input.path))

        # Pick an appropriate VM size and create the pool as necessary
        vm_size = backend_common.determine_azure_vm_for_task(task.resources)
        override_pool_id = current_app.config.get('BATCH_OVERRIDE_POOL_ID', None)
        task_override_pool_id = task.tags.get('ms-backend-batch-pool-id', None)
        if task_override_pool_id:
            current_app.logger.info(f"Using pool override '{override_pool_id}' from task tag for batch job '{job_id}")
            pool_info = azbatch.models.PoolInformation(pool_id=task_override_pool_id)
        elif override_pool_id:
            current_app.logger.info(f"Using pool override '{override_pool_id}' from app config for batch job '{job_id}'")
            pool_info = azbatch.models.PoolInformation(pool_id=override_pool_id)
        else:
            current_app.logger.info(f"Auto-pool to be created with batch job {job_id}")

            # Tasks timeout if pulls take >60s, so pull the images at pool creation time
            executor_image_names = list(set(executor.image for executor in task.executors))

            pool_info = azbatch.models.PoolInformation(
                auto_pool_specification=azbatch.models.AutoPoolSpecification(
                    auto_pool_id_prefix='tes',
                    pool_lifetime_option='job',
                    keep_alive=current_app.config.get('BATCH_AUTOPOOL_KEEPALIVE', False),
                    pool=self._initializePoolSpec(vm_size, executor_image_names=executor_image_names)
                )
            )

        job = azbatch.models.JobAddParameter(
            id=job_id,
            pool_info=pool_info,
            on_all_tasks_complete=azbatch.models.OnAllTasksComplete.terminate_job,
            on_task_failure=azbatch.models.OnTaskFailure.perform_exit_options_job_action
        )

        current_app.logger.info(f'Adding preparation task for job {job_id}')
        task_container_run_options = '-v "$AZ_BATCH_TASK_DIR/../:/tes-wd"'
        if 'BATCH_STORAGE_FILESHARE_NAME' in current_app.config and current_app.config['BATCH_STORAGE_FILESHARE_NAME'] is not None:
            azfiles_path = f"/mnt/batch/tasks/shared-azfiles"
            task_container_run_options += f' -v "{azfiles_path}:/tes-wd/shared-global"'
        fileprep_task_container_run_options = '--entrypoint=/bin/sh ' + task_container_run_options
        download_commands_shell = ';'.join(download_commands) if download_commands else 'true'

        job.job_preparation_task = azbatch.models.JobPreparationTask(
            container_settings=azbatch.models.TaskContainerSettings(
                image_name=current_app.config.get('FILETRANSFER_CONTAINER_IMAGE'),
                container_run_options=fileprep_task_container_run_options
            ),
            # mkdir is required to ensure permissions are right on folder
            # if created by Docker from -v syntax, owned by root and permissions
            # errors ensue
            command_line=f""" -c "set -e; set -o pipefail; mkdir -p /tes-wd/shared; {download_commands_shell}; wait" """,
        )

        if upload_commands:
            # prep task is always used for mkdir above at a minimum, but if
            # changing this logic later, recall that release task cannot be
            # specified without prep task
            current_app.logger.info(f'Adding release task for job {job_id}')
            job.job_release_task = azbatch.models.JobReleaseTask(
                container_settings=azbatch.models.TaskContainerSettings(
                    image_name=current_app.config.get('FILETRANSFER_CONTAINER_IMAGE'),
                    container_run_options=fileprep_task_container_run_options
                ),
                command_line=f""" -c "set -e; set -o pipefail; {';'.join(upload_commands)}; wait" """
            )

        with _handle_batch_error():
            self.batch_client.job.add(job)

        for executor in task.executors:
            task_id = str(uuid.uuid4())
            current_app.logger.info(f'Creating Azure Batch task {task_id} for executor')

            batch_task_env = [azbatch.models.EnvironmentSetting(name=k, value=v) for k, v in executor.env.items()]

            commands = [' '.join(executor.command)]
            if executor.stdout:
                # currently the source path isn't quoted; we should map the workdir into /tes-wd explicitly
                # to avoid having to use the shell expansion here
                commands += backend_common.commands.generate_copy_commands(posixpath.join("/tes-wd/$AZ_BATCH_TASK_ID/stdout.txt"), executor.stderr)
            if executor.stderr:
                commands += backend_common.commands.generate_copy_commands(posixpath.join("/tes-wd/$AZ_BATCH_TASK_ID/stderr.txt"), executor.stderr)

            # TODO: Handle 'workdir' parameter from TES
            task = azbatch.models.TaskAddParameter(
                id=task_id,
                environment_settings=batch_task_env,
                command_line=f"""/bin/sh -c "set -e; {';'.join(commands)}; wait" """,
                container_settings=azbatch.models.TaskContainerSettings(
                    image_name=executor.image,
                    container_run_options=task_container_run_options
                )
            )
            self.batch_client.task.add(job_id=job_id, task=task)

        return job_id