def mangle_task_for_submitter()

in api-server/tesazure/backends/common/__init__.py [0:0]


def mangle_task_for_submitter(task):
    """
    Applies transformations for the detected submitter
    """
    if task.tags.get('ms-submitter-name', None) == 'cromwell':
        exec_environ_prefix = '/tes-wd/shared'
        container_name = current_app.config.get('CROMWELL_STORAGE_CONTAINER_NAME', None)
        if not container_name:
            # Setting config key to None creates a storage container per workflow
            # TODO: Document this if/when blobfuse support mounting all containers in an account
            container_name = task.tags.get('ms-submitter-workflow-id', None)

        blob_service = azblob.BlockBlobService(account_name=current_app.config['STORAGE_ACCOUNT_NAME'], account_key=current_app.config['STORAGE_ACCOUNT_KEY'])
        blob_service.create_container(container_name)
        read_sas_token = blob_service.generate_container_shared_access_signature(
            container_name,
            permission=azstorage.models.AccountPermissions.READ,
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=48),
        )
        write_sas_token = blob_service.generate_container_shared_access_signature(
            container_name,
            permission=azstorage.models.AccountPermissions.WRITE,
            expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=48),
        )

        replacement_inputs = []
        cromwell_input_filenames = []
        for input in task.inputs:
            if not input.content and input.url == input.path and input.url.startswith(exec_environ_prefix):
                # input is managed by Cromwell, map it to blob container
                # remove execution environment path prefix
                path_parts = pathlib.PurePosixPath(input.path).parts[2:]
                blob_filename = str(pathlib.PurePosixPath(*path_parts))
                input.url = blob_service.make_blob_url(container_name, blob_filename, sas_token=read_sas_token)

                # used to determine which preprocessed files in execution dir do not need injecting
                cromwell_input_filenames.append(str(pathlib.PurePosixPath(input.path).name))
            replacement_inputs.append(input)

        # Some Cromwell commands e.g. write_tsv() write outputs to execution dir on the Cromwell server during preprocessing.
        # These are not passed on to TES inputs, so unless we are using a shared filesystem (i.e. Azure Files) we need to
        execution_dir = task.tags.get('ms-submitter-cromwell-executiondir', None)
        if not execution_dir:
            current_app.logger.warn("ms-submitter-name=cromwell but tag 'ms-submitter-cromwell-executiondir' is missing; skipping automatic upload of Cromwell-preprocessed workflow inputs")
        else:
            # remove execution environment path prefix
            path_parts = pathlib.PurePosixPath(execution_dir).parts[2:]
            blob_execution_dir = str(pathlib.PurePosixPath(*path_parts))
            # do not inject inputs for files present in execution_dir that are already TES tasks inputs
            blobs_to_inject = [blob_path for blob_path in blob_service.list_blob_names(container_name, blob_execution_dir) if str(pathlib.PurePosixPath(blob_path).name) not in cromwell_input_filenames and blob_path != blob_execution_dir]
            for blob_path in blobs_to_inject:
                blob_name = str(pathlib.PurePosixPath(blob_path).name)
                input = tesmodels.TesInput(name=f'injected-{blob_name}', path=str(pathlib.PurePosixPath(execution_dir) / blob_name), url=blob_service.make_blob_url(container_name, blob_path, sas_token=read_sas_token))
                replacement_inputs.append(input)
        task.inputs = replacement_inputs

        replacement_outputs = []
        for output in task.outputs:
            if output.url == output.path and output.path.startswith(exec_environ_prefix):
                # remove execution environment path prefix
                path_parts = pathlib.PurePosixPath(output.path).parts[2:]
                blob_filename = str(pathlib.PurePosixPath(*path_parts))
                output.url = blob_service.make_blob_url(container_name, blob_filename, sas_token=write_sas_token)
            replacement_outputs.append(output)
        task.outputs = replacement_outputs