in metaflow/includefile.py [0:0]
def convert(self, value, param, ctx):
# Click can call convert multiple times, so we need to make sure to only
# convert once. This function will return a DelayedEvaluationParameter
# (if it needs to still perform an upload) or an IncludedFile if not
if isinstance(value, (DelayedEvaluationParameter, IncludedFile)):
return value
# Value will be a string containing one of two things:
# - Scenario A: a JSON blob indicating that the file has already been uploaded.
# This scenario this happens in is as follows:
# + `step-functions create` is called and the IncludeFile has a default
# value. At the time of creation, the file is uploaded and a URL is
# returned; this URL is packaged in a blob by Uploader and passed to
# step-functions as the value of the parameter.
# + when the step function actually runs, the value is passed to click
# through METAFLOW_INIT_XXX; this value is the one returned above
# - Scenario B: A path. The path can either be:
# + B.1: <prefix>://<something> like s3://foo/bar or local:///foo/bar
# (right now, we are disabling support for this because the artifact
# can change unlike all other artifacts. It is trivial to re-enable
# + B.2: an actual path to a local file like /foo/bar
# In the first case, we just store an *external* reference to it (so we
# won't upload anything). In the second case we will want to upload something,
# but we only do that in the DelayedEvaluationParameter step.
# ctx can be one of two things:
# - the click context (when called normally)
# - the ParameterContext (when called through _eval_default)
# If not a ParameterContext, we convert it to that
if not isinstance(ctx, ParameterContext):
ctx = ParameterContext(
flow_name=ctx.obj.flow.name,
user_name=get_username(),
parameter_name=param.name,
logger=ctx.obj.echo,
ds_type=ctx.obj.datastore_impl.TYPE,
configs=None,
)
if len(value) > 0 and (value.startswith("{") or value.startswith('"{')):
# This is a blob; no URL starts with `{`. We are thus in scenario A
try:
value = json.loads(value)
# to handle quoted json strings
if not isinstance(value, dict):
value = json.loads(value)
except json.JSONDecodeError as e:
raise MetaflowException(
"IncludeFile '%s' (value: %s) is malformed" % (param.name, value)
)
# All processing has already been done, so we just convert to an `IncludedFile`
return IncludedFile(value)
path = os.path.expanduser(value)
prefix_pos = path.find("://")
if prefix_pos > 0:
# Scenario B.1
raise MetaflowException(
"IncludeFile using a direct reference to a file in cloud storage is no "
"longer supported. Contact the Metaflow team if you need this supported"
)
# if _dict_dataclients.get(path[:prefix_pos]) is None:
# self.fail(
# "IncludeFile: no handler for external file of type '%s' "
# "(given path is '%s')" % (path[:prefix_pos], path)
# )
# # We don't need to do anything more -- the file is already uploaded so we
# # just return a blob indicating how to get the file.
# return IncludedFile(
# CURRENT_UPLOADER.encode_url(
# "external", path, is_text=self._is_text, encoding=self._encoding
# )
# )
else:
# Scenario B.2
# Check if this is a valid local file
try:
with open(path, mode="r") as _:
pass
except OSError:
self.fail("IncludeFile: could not open file '%s' for reading" % path)
handler = _dict_dataclients.get(ctx.ds_type)
if handler is None:
self.fail(
"IncludeFile: no data-client for datastore of type '%s'"
% ctx.ds_type
)
# Now that we have done preliminary checks, we will delay uploading it
# until later (so it happens after PyLint checks the flow, but we prepare
# everything for it)
lambda_ctx = _DelayedExecContext(
flow_name=ctx.flow_name,
path=path,
is_text=self._is_text,
encoding=self._encoding,
handler_type=ctx.ds_type,
echo=ctx.logger,
)
def _delayed_eval_func(ctx=lambda_ctx, return_str=False):
incl_file = IncludedFile(
CURRENT_UPLOADER.store(
ctx.flow_name,
ctx.path,
ctx.is_text,
ctx.encoding,
_dict_dataclients[ctx.handler_type],
ctx.echo,
)
)
if return_str:
return json.dumps(incl_file.descriptor)
return incl_file
return DelayedEvaluationParameter(
ctx.parameter_name,
"default",
functools.partial(_delayed_eval_func, ctx=lambda_ctx),
)