in api-server/tesazure/backends/common/__init__.py [0:0]
def mangle_task_for_submitter(task):
"""
Applies transformations for the detected submitter
"""
if task.tags.get('ms-submitter-name', None) == 'cromwell':
exec_environ_prefix = '/tes-wd/shared'
container_name = current_app.config.get('CROMWELL_STORAGE_CONTAINER_NAME', None)
if not container_name:
# Setting config key to None creates a storage container per workflow
# TODO: Document this if/when blobfuse support mounting all containers in an account
container_name = task.tags.get('ms-submitter-workflow-id', None)
blob_service = azblob.BlockBlobService(account_name=current_app.config['STORAGE_ACCOUNT_NAME'], account_key=current_app.config['STORAGE_ACCOUNT_KEY'])
blob_service.create_container(container_name)
read_sas_token = blob_service.generate_container_shared_access_signature(
container_name,
permission=azstorage.models.AccountPermissions.READ,
expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=48),
)
write_sas_token = blob_service.generate_container_shared_access_signature(
container_name,
permission=azstorage.models.AccountPermissions.WRITE,
expiry=datetime.datetime.utcnow() + datetime.timedelta(hours=48),
)
replacement_inputs = []
cromwell_input_filenames = []
for input in task.inputs:
if not input.content and input.url == input.path and input.url.startswith(exec_environ_prefix):
# input is managed by Cromwell, map it to blob container
# remove execution environment path prefix
path_parts = pathlib.PurePosixPath(input.path).parts[2:]
blob_filename = str(pathlib.PurePosixPath(*path_parts))
input.url = blob_service.make_blob_url(container_name, blob_filename, sas_token=read_sas_token)
# used to determine which preprocessed files in execution dir do not need injecting
cromwell_input_filenames.append(str(pathlib.PurePosixPath(input.path).name))
replacement_inputs.append(input)
# Some Cromwell commands e.g. write_tsv() write outputs to execution dir on the Cromwell server during preprocessing.
# These are not passed on to TES inputs, so unless we are using a shared filesystem (i.e. Azure Files) we need to
execution_dir = task.tags.get('ms-submitter-cromwell-executiondir', None)
if not execution_dir:
current_app.logger.warn("ms-submitter-name=cromwell but tag 'ms-submitter-cromwell-executiondir' is missing; skipping automatic upload of Cromwell-preprocessed workflow inputs")
else:
# remove execution environment path prefix
path_parts = pathlib.PurePosixPath(execution_dir).parts[2:]
blob_execution_dir = str(pathlib.PurePosixPath(*path_parts))
# do not inject inputs for files present in execution_dir that are already TES tasks inputs
blobs_to_inject = [blob_path for blob_path in blob_service.list_blob_names(container_name, blob_execution_dir) if str(pathlib.PurePosixPath(blob_path).name) not in cromwell_input_filenames and blob_path != blob_execution_dir]
for blob_path in blobs_to_inject:
blob_name = str(pathlib.PurePosixPath(blob_path).name)
input = tesmodels.TesInput(name=f'injected-{blob_name}', path=str(pathlib.PurePosixPath(execution_dir) / blob_name), url=blob_service.make_blob_url(container_name, blob_path, sas_token=read_sas_token))
replacement_inputs.append(input)
task.inputs = replacement_inputs
replacement_outputs = []
for output in task.outputs:
if output.url == output.path and output.path.startswith(exec_environ_prefix):
# remove execution environment path prefix
path_parts = pathlib.PurePosixPath(output.path).parts[2:]
blob_filename = str(pathlib.PurePosixPath(*path_parts))
output.url = blob_service.make_blob_url(container_name, blob_filename, sas_token=write_sas_token)
replacement_outputs.append(output)
task.outputs = replacement_outputs