in packages/cdk/lib/wes_adapter/amazon_genomics/wes/adapters/CromwellWESAdapter.py [0:0]
def parse_workflow_manifest_file(manifest_file):
"""
Reads a MANIFEST.json file for a workflow zip bundle
:param manifest_file: String or Path-like path to a MANIFEST.json file
:rtype: dict of `data` and `files`
MANIFEST.json is expected to be formatted like:
.. code-block:: json
{
"mainWorkflowURL": "relpath/to/workflow",
"inputFileURLs": [
"relpath/to/input-file-1",
"relpath/to/input-file-2",
...
],
"optionsFileURL" "relpath/to/option-file
}
The `mainWorkflowURL` property that provides a relative file path in the zip to a workflow file, which will be set as `workflowSource`
The inputFileURLs property is optional and provides a list of relative file paths in the zip to input.json files. The list is assumed
to be in the order the inputs should be applied - e.g. higher list index is higher priority. If present, it will be used to set
`workflowInputs(_\d)` arguments.
The optionsFileURL property is optional and provides a relative file path in the zip to an options.json file. If present, it will be
used to set `workflowOptions`.
"""
data = dict()
files = dict()
with open(manifest_file, "rt") as f:
manifest = json.loads(f.read())
u = urlparse(manifest["mainWorkflowURL"])
if not u.scheme or u.scheme == "file":
# expect "/path/to/file" or "file:///path/to/file"
# root is relative to the zip root
files["workflowSource"] = open(
workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb"
)
else:
data["workflowUrl"] = manifest["mainWorkflowUrl"]
if manifest.get("inputFileURLs"):
if not files.get("workflowInputFiles"):
files["workflowInputFiles"] = []
for url in manifest["inputFileURLs"]:
u = urlparse(url)
if not u.scheme or u.scheme == "file":
files[f"workflowInputFiles"] += [
open(
workflow_manifest_url_to_path(u, path.dirname(manifest_file)),
"rb",
)
]
else:
raise InvalidRequestError(
f"unsupported input file url scheme for: '{url}'"
)
if manifest.get("optionsFileURL"):
u = urlparse(manifest["optionsFileURL"])
if not u.scheme or u.scheme == "file":
files["workflowOptions"] = open(
workflow_manifest_url_to_path(u, path.dirname(manifest_file)), "rb"
)
else:
raise InvalidRequestError(
f"unsupported option file url scheme for: '{manifest['optionFileURL']}'"
)
return {"data": data, "files": files}