def _do_persist()

in odps/ml/engine.py [0:0]


    def _do_persist(self, expr_dag, src_expr, name, partitions=None, partition=None, project=None,
                    drop_table=False, create_table=True, drop_partition=False, create_partition=False,
                    **kwargs):
        from .runners import SQLNodeRunner
        from .enums import PortType

        expr = expr_dag.root
        kwargs['_output_models_only'] = self._is_output_model_only(src_expr)

        output_exprs = src_expr.outputs()
        shared_kw = src_expr.shared_kw
        shared_kw['required_outputs'] = dict()
        if hasattr(src_expr, 'output_ports'):
            for out_port in src_expr.output_ports:
                if not out_port.required and out_port.name not in output_exprs:
                    continue

                if out_port.name in output_exprs:
                    out_expr = output_exprs[out_port.name]
                    if not getattr(out_expr, 'persist_kw', None):
                        expr_name = self._gen_table_name(out_expr) if isinstance(out_expr, CollectionExpr) \
                            else self._gen_model_name(expr)
                        self._write_persist_kw(expr_name, out_expr, **kwargs)
                else:
                    expr_name = self._gen_table_name(src_expr.node_name) if out_port.type == PortType.DATA \
                        else self._gen_model_name(src_expr.node_name)
                    shared_kw['required_outputs'][out_port.name] = expr_name
        src_expr.shared_kw = shared_kw

        kw = kwargs.copy()
        ui = kw.pop('ui')
        progress_proportion = kw.pop('progress_proportion', 1)
        ui_group = kw.pop('group', None)
        engine_kw = getattr(src_expr, '_engine_kw', None)

        if kw.get('lifecycle'):
            engine_kw['lifecycle'] = kw['lifecycle']
        elif options.lifecycle:
            engine_kw['lifecycle'] = options.lifecycle

        if hasattr(src_expr, '_cases'):
            kw['_cases'] = src_expr._cases

        if not options.ml.dry_run:
            self._build_output_tables(src_expr)

        sql_callbacks = []

        expr.wait_execution()

        if not src_expr.executed:
            for out_expr in six.itervalues(output_exprs):
                callback = self._handle_expr_persist(out_expr)
                if callback is not None:
                    sql_callbacks.append(callback)

        gen_params = expr.convert_params(src_expr)

        if not src_expr.executed:
            prog_ratio = 1
            sub_ratio = 0
            if sql_callbacks:
                prog_ratio = 0.8
                sub_ratio = (1 - prog_ratio) * progress_proportion / len(sql_callbacks)
            try:
                self._run(src_expr._algo, gen_params, src_expr.algo_meta, engine_kw, ui,
                          progress_proportion=prog_ratio * progress_proportion, group=ui_group, **kw)

                for cb in sql_callbacks:
                    sql = cb()
                    runner = SQLNodeRunner(self, 'SQL', dict(sql=sql), dict(), dict(), ui,
                                           progress_proportion=sub_ratio, group=ui_group)
                    runner.execute()
            finally:
                src_expr.executed = True

        if getattr(src_expr, 'is_extra_expr', False):
            t = src_expr._table_callback(self._odps, src_expr)
            context.cache(src_expr, t)

            if options.ml.dry_run:
                df = CollectionExpr(_source_data=t, _schema=src_expr._schema)
            else:
                df = DataFrame(t)
            df._ml_fields = src_expr._ml_fields
            return df

        ret = None
        for out_name, out_expr in six.iteritems(output_exprs):
            r = self._cache_expr_result(out_expr)
            if out_name == src_expr._output_name:
                ret = r

        return ret