def get_workflow_from_s3()

in packages/cdk/lib/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py [0:0]


def get_workflow_from_s3(s3_uri: str, localpath: str, workflow_type: str):
    """
    Retrieves a workflow from S3

    :param s3_uri: The S3 URI to the workflow (e.g. s3://bucketname/path/to/workflow.zip)
    :param localpath: The location on the local filesystem to download the workflow
    :param workflow_type: Type of workflow to expect (e.g. wdl, cwl, etc)

    :rtype: dict of `data` and `files`

    If the object is a generic file the file is set as `workflowSource`

    If the object is a `workflow.zip` file containing a single file, that file is set as `workflowSource`

    If the object is a `workflow.zip` file containing multiple files with a MANIFEST.json the MANIFEST is expected to have
      * a mainWorkflowURL property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource`
      * optionally, if an inputFileURLs property exists that provides a list of relative file paths in the zip to input.json, it will be used to set `workflowInputs`
      * optionally, if an optionFileURL property exists that provides a relative file path in the zip to an options.json file, it will be used to set `workflowOptions`

    If the object is a `workflow.zip` file containing multiple files without a MANIFEST.json
      * a `main` workflow file with an extension matching the workflow_type is expected and will be set as `workflowSource`
      * optionally, if `inputs*.json` files are found in the root level of the zip, they will be set as `workflowInputs(_\d)*` in the order they are found
      * optionally, if an `options.json` file is found in the root level of the zip, it will be set as `workflowOptions`

    If the object is a `workflow.zip` file containing multiple files, the `workflow.zip` file is set as `workflowDependencies`
    """
    s3 = boto3.resource("s3")

    u = urlparse(s3_uri)
    bucket = s3.Bucket(u.netloc)
    key = u.path[1:]

    data = dict()
    files = dict()

    if not key:
        raise RuntimeError("invalid or missing S3 object key")

    try:
        file = path.join(localpath, path.basename(key))
        bucket.download_file(key, file)
    except botocore.exceptions.ClientError as e:
        raise RuntimeError(f"invalid S3 object: {e}")

    if path.basename(file) == "workflow.zip":
        try:
            props = parse_workflow_zip_file(file, workflow_type)
        except Exception as e:
            raise RuntimeError(f"{s3_uri} is not a valid workflow.zip file: {e}")

        if props.get("data"):
            data.update(props.get("data"))

        if props.get("files"):
            files.update(props.get("files"))
    else:
        files["workflowSource"] = open(file, "rb")

    return {"data": data, "files": files}