metaflow/R.py (106 lines of code) (raw):
import os
import sys
from importlib import util as imp_util, machinery as imp_machinery
from tempfile import NamedTemporaryFile
from . import parameters
from .util import to_bytes
R_FUNCTIONS = {}
R_PACKAGE_PATHS = None
RDS_FILE_PATH = None
R_CONTAINER_IMAGE = None
METAFLOW_R_VERSION = None
R_VERSION = None
R_VERSION_CODE = None
def call_r(func_name, args):
R_FUNCTIONS[func_name](*args)
def get_r_func(func_name):
return R_FUNCTIONS[func_name]
def package_paths():
if R_PACKAGE_PATHS is not None:
root = R_PACKAGE_PATHS["package"]
prefixlen = len("%s/" % root.rstrip("/"))
for path, dirs, files in os.walk(R_PACKAGE_PATHS["package"]):
if "/." in path:
continue
for fname in files:
if fname[0] == ".":
continue
p = os.path.join(path, fname)
yield p, os.path.join("metaflow-r", p[prefixlen:])
flow = R_PACKAGE_PATHS["flow"]
yield flow, os.path.basename(flow)
def entrypoint():
return (
"PYTHONPATH=/root/metaflow R_LIBS_SITE=`Rscript -e 'cat(paste(.libPaths(), collapse=\\\":\\\"))'`:metaflow/ Rscript metaflow-r/run_batch.R --flowRDS=%s"
% RDS_FILE_PATH
)
def use_r():
return R_PACKAGE_PATHS is not None
def container_image():
return R_CONTAINER_IMAGE
def metaflow_r_version():
return METAFLOW_R_VERSION
def r_version():
return R_VERSION
def r_version_code():
return R_VERSION_CODE
def working_dir():
if use_r():
return R_PACKAGE_PATHS["wd"]
return None
def load_module_from_path(module_name: str, path: str):
"""
Loads a module from a given path
Parameters
----------
module_name: str
Name to assign for the loaded module. Usable for importing after loading.
path: str
path to the file to be loaded
"""
loader = imp_machinery.SourceFileLoader(module_name, path)
spec = imp_util.spec_from_loader(loader.name, loader)
module = imp_util.module_from_spec(spec)
loader.exec_module(module)
# Required in order to be able to import the module by name later.
sys.modules[module_name] = module
return module
def run(
flow_script,
r_functions,
rds_file,
metaflow_args,
full_cmdline,
r_paths,
r_container_image,
metaflow_r_version,
r_version,
r_version_code,
):
global R_FUNCTIONS, R_PACKAGE_PATHS, RDS_FILE_PATH, R_CONTAINER_IMAGE, METAFLOW_R_VERSION, R_VERSION, R_VERSION_CODE
R_FUNCTIONS = r_functions
R_PACKAGE_PATHS = r_paths
RDS_FILE_PATH = rds_file
R_CONTAINER_IMAGE = r_container_image
METAFLOW_R_VERSION = metaflow_r_version
R_VERSION = r_version
R_VERSION_CODE = r_version_code
# there's some reticulate(?) sillyness which causes metaflow_args
# not to be a list if it has only one item. Here's a workaround
if not isinstance(metaflow_args, list):
metaflow_args = [metaflow_args]
# remove any reference to local path structure from R
full_cmdline[0] = os.path.basename(full_cmdline[0])
with NamedTemporaryFile(prefix="metaflowR.", delete=False) as tmp:
tmp.write(to_bytes(flow_script))
module = load_module_from_path("metaflowR", tmp.name)
flow = module.FLOW(use_cli=False)
from . import exception
try:
with parameters.flow_context(flow.__class__) as _:
from . import cli
cli.main(
flow,
args=metaflow_args,
handle_exceptions=False,
entrypoint=full_cmdline[: -len(metaflow_args)],
)
except exception.MetaflowException as e:
cli.print_metaflow_exception(e)
os.remove(tmp.name)
os._exit(1)
except Exception as e:
import sys
print(e)
sys.stdout.flush()
os.remove(tmp.name)
os._exit(1)
finally:
os.remove(tmp.name)