def convert()

in metaflow/includefile.py [0:0]


    def convert(self, value, param, ctx):
        # Click can call convert multiple times, so we need to make sure to only
        # convert once. This function will return a DelayedEvaluationParameter
        # (if it needs to still perform an upload) or an IncludedFile if not
        if isinstance(value, (DelayedEvaluationParameter, IncludedFile)):
            return value

        # Value will be a string containing one of two things:
        #  - Scenario A: a JSON blob indicating that the file has already been uploaded.
        #    This scenario this happens in is as follows:
        #      + `step-functions create` is called and the IncludeFile has a default
        #        value. At the time of creation, the file is uploaded and a URL is
        #        returned; this URL is packaged in a blob by Uploader and passed to
        #        step-functions as the value of the parameter.
        #      + when the step function actually runs, the value is passed to click
        #        through METAFLOW_INIT_XXX; this value is the one returned above
        #  - Scenario B: A path. The path can either be:
        #      + B.1: <prefix>://<something> like s3://foo/bar or local:///foo/bar
        #        (right now, we are disabling support for this because the artifact
        #        can change unlike all other artifacts. It is trivial to re-enable
        #      + B.2: an actual path to a local file like /foo/bar
        #    In the first case, we just store an *external* reference to it (so we
        #    won't upload anything). In the second case we will want to upload something,
        #    but we only do that in the DelayedEvaluationParameter step.

        # ctx can be one of two things:
        #  - the click context (when called normally)
        #  - the ParameterContext (when called through _eval_default)
        # If not a ParameterContext, we convert it to that
        if not isinstance(ctx, ParameterContext):
            ctx = ParameterContext(
                flow_name=ctx.obj.flow.name,
                user_name=get_username(),
                parameter_name=param.name,
                logger=ctx.obj.echo,
                ds_type=ctx.obj.datastore_impl.TYPE,
                configs=None,
            )

        if len(value) > 0 and (value.startswith("{") or value.startswith('"{')):
            # This is a blob; no URL starts with `{`. We are thus in scenario A
            try:
                value = json.loads(value)
                # to handle quoted json strings
                if not isinstance(value, dict):
                    value = json.loads(value)
            except json.JSONDecodeError as e:
                raise MetaflowException(
                    "IncludeFile '%s' (value: %s) is malformed" % (param.name, value)
                )
            # All processing has already been done, so we just convert to an `IncludedFile`
            return IncludedFile(value)

        path = os.path.expanduser(value)

        prefix_pos = path.find("://")
        if prefix_pos > 0:
            # Scenario B.1
            raise MetaflowException(
                "IncludeFile using a direct reference to a file in cloud storage is no "
                "longer supported. Contact the Metaflow team if you need this supported"
            )
            # if _dict_dataclients.get(path[:prefix_pos]) is None:
            #     self.fail(
            #         "IncludeFile: no handler for external file of type '%s' "
            #         "(given path is '%s')" % (path[:prefix_pos], path)
            #     )
            # # We don't need to do anything more -- the file is already uploaded so we
            # # just return a blob indicating how to get the file.
            # return IncludedFile(
            #     CURRENT_UPLOADER.encode_url(
            #         "external", path, is_text=self._is_text, encoding=self._encoding
            #     )
            # )
        else:
            # Scenario B.2
            # Check if this is a valid local file
            try:
                with open(path, mode="r") as _:
                    pass
            except OSError:
                self.fail("IncludeFile: could not open file '%s' for reading" % path)
            handler = _dict_dataclients.get(ctx.ds_type)
            if handler is None:
                self.fail(
                    "IncludeFile: no data-client for datastore of type '%s'"
                    % ctx.ds_type
                )

            # Now that we have done preliminary checks, we will delay uploading it
            # until later (so it happens after PyLint checks the flow, but we prepare
            # everything for it)
            lambda_ctx = _DelayedExecContext(
                flow_name=ctx.flow_name,
                path=path,
                is_text=self._is_text,
                encoding=self._encoding,
                handler_type=ctx.ds_type,
                echo=ctx.logger,
            )

            def _delayed_eval_func(ctx=lambda_ctx, return_str=False):
                incl_file = IncludedFile(
                    CURRENT_UPLOADER.store(
                        ctx.flow_name,
                        ctx.path,
                        ctx.is_text,
                        ctx.encoding,
                        _dict_dataclients[ctx.handler_type],
                        ctx.echo,
                    )
                )
                if return_str:
                    return json.dumps(incl_file.descriptor)
                return incl_file

            return DelayedEvaluationParameter(
                ctx.parameter_name,
                "default",
                functools.partial(_delayed_eval_func, ctx=lambda_ctx),
            )