in packages/cdk/lib/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py [0:0]
def get_workflow_from_s3(s3_uri: str, localpath: str, workflow_type: str):
"""
Retrieves a workflow from S3
:param s3_uri: The S3 URI to the workflow (e.g. s3://bucketname/path/to/workflow.zip)
:param localpath: The location on the local filesystem to download the workflow
:param workflow_type: Type of workflow to expect (e.g. wdl, cwl, etc)
:rtype: dict of `data` and `files`
If the object is a generic file the file is set as `workflowSource`
If the object is a `workflow.zip` file containing a single file, that file is set as `workflowSource`
If the object is a `workflow.zip` file containing multiple files with a MANIFEST.json the MANIFEST is expected to have
* a mainWorkflowURL property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource`
* optionally, if an inputFileURLs property exists that provides a list of relative file paths in the zip to input.json, it will be used to set `workflowInputs`
* optionally, if an optionFileURL property exists that provides a relative file path in the zip to an options.json file, it will be used to set `workflowOptions`
If the object is a `workflow.zip` file containing multiple files without a MANIFEST.json
* a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource`
* optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found
* optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions`
If the object is a `workflow.zip` file containing multiple files, the `workflow.zip` file is set as `workflowDependencies`
"""
s3 = boto3.resource("s3")
u = urlparse(s3_uri)
bucket = s3.Bucket(u.netloc)
key = u.path[1:]
data = dict()
files = dict()
if not key:
raise RuntimeError("invalid or missing S3 object key")
try:
file = path.join(localpath, path.basename(key))
bucket.download_file(key, file)
except botocore.exceptions.ClientError as e:
raise RuntimeError(f"invalid S3 object: {e}")
if path.basename(file) == "workflow.zip":
try:
props = parse_workflow_zip_file(file, workflow_type)
except Exception as e:
raise RuntimeError(f"{s3_uri} is not a valid workflow.zip file: {e}")
if props.get("data"):
data.update(props.get("data"))
if props.get("files"):
files.update(props.get("files"))
else:
files["workflowSource"] = open(file, "rb")
return {"data": data, "files": files}