def parse_workflow_manifest_file()

in packages/cdk/lib/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py [0:0]


def parse_workflow_manifest_file(manifest_file):
    """
    Reads a MANIFEST.json file for a workflow zip bundle

    :param manifest_file: String or Path-like path to a MANIFEST.json file

    :rtype: dict of `data` and `files`

    MANIFEST.json is expected to be formatted like:
    .. code-block:: json
       {
           "mainWorkflowURL": "relpath/to/workflow",
           "inputFileURLs": [
               "relpath/to/input-file-1",
               "relpath/to/input-file-2",
               ...
           ],
           "optionsFileURL" "relpath/to/option-file
       }

    The `mainWorkflowURL` property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource`

    The inputFileURLs property is optional and provides a list of relative file paths in the zip to input.json files. The list is assumed
    to be in the order the inputs should be applied - e.g. higher list index is higher priority. If present, it will be used to set
    `workflowInputs(_\d)` arguments.

    The optionsFileURL property is optional and  provides a relative file path in the zip to an options.json file. If present, it will be
    used to set `workflowOptions`.

    """
    data = dict()
    files = dict()
    with open(manifest_file, "rt") as f:
        manifest = json.loads(f.read())

    u = urlparse(manifest["mainWorkflowURL"])
    if not u.scheme or u.scheme == "file":
        # expect "/path/to/file" or "file:///path/to/file"
        # root is relative to the zip root
        files["workflowSource"] = open(
            workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb"
        )

    else:
        data["workflowUrl"] = manifest["mainWorkflowUrl"]

    if manifest.get("inputFileURLs"):
        if not files.get("workflowInputFiles"):
            files["workflowInputFiles"] = []

        for url in manifest["inputFileURLs"]:
            u = urlparse(url)
            if not u.scheme or u.scheme == "file":
                files[f"workflowInputFiles"] += [
                    open(
                        workflow_manifest_url_to_path(u, path.dirname(manifest_file)),
                        "rb",
                    )
                ]

            else:
                raise InvalidRequestError(
                    f"unsupported input file url scheme for: '{url}'"
                )

    if manifest.get("optionsFileURL"):
        u = urlparse(manifest["optionsFileURL"])
        if not u.scheme or u.scheme == "file":
            files["workflowOptions"] = open(
                workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb"
            )
        else:
            raise InvalidRequestError(
                f"unsupported option file url scheme for: '{manifest['optionFileURL']}'"
            )

    return {"data": data, "files": files}