def _do_execute()

in odps/ml/engine.py [0:0]


    def _do_execute(self, expr_dag, src_expr, **kwargs):
        expr = expr_dag.root

        kwargs['_output_models_only'] = self._is_output_model_only(src_expr)

        kw = kwargs.copy()
        if isinstance(src_expr, ODPSModelExpr):
            ui = kw.pop('ui')
            progress_proportion = kw.pop('progress_proportion', 1)
            download_progress = progress_proportion
            ui_group = kw.pop('group', None)

            if hasattr(src_expr, '_source_data'):
                result_expr = src_expr
            else:
                if not context.is_cached(src_expr):
                    temp_name = self._gen_model_name(src_expr)
                    download_progress = 0.1 * progress_proportion
                    self._do_persist(expr_dag, src_expr, temp_name, ui=ui,
                                     progress_proportion=0.9 * progress_proportion, group=ui_group, **kw)

                result_expr = src_expr.get_cached(context.get_cached(src_expr))

            if result_expr._is_offline_model:
                from .expr.models.pmml import PmmlResult
                from .runners import XFlowNodeRunner

                model = result_expr._source_data
                if not options.ml.use_model_transfer:
                    pmml = model.get_model()
                    return PmmlResult(pmml)
                else:
                    volume_name = options.ml.model_volume
                    if not self._odps.exist_volume(volume_name):
                        self._odps.create_parted_volume(volume_name)

                    vol_part = hashlib.md5(utils.to_binary(model.name)).hexdigest()
                    tempobj.register_temp_volume_partition(self._odps, (volume_name, vol_part))
                    algo_params = {
                        'modelName': model.name,
                        'volumeName': volume_name,
                        'partition': vol_part,
                        'format': 'pmml'
                    }
                    runner = XFlowNodeRunner(self, 'modeltransfer', algo_params, {}, {},
                                             ui=ui, progress_proportion=download_progress, group=ui_group)
                    runner.execute()
                    pmml = self._odps.open_volume_reader(volume_name, vol_part, model.name + '.xml').read()
                    self._odps.delete_volume_partition(volume_name, vol_part)
                    return PmmlResult(utils.to_str(pmml))
            else:
                from .expr.models.base import TablesModelResult
                results = dict()
                frac = 1.0 / len(result_expr._model_collections)
                for key, item in six.iteritems(result_expr._model_collections):
                    result = item.execute(ui=ui, progress_proportion=frac * 0.1 * progress_proportion,
                                          group=ui_group)
                    results[key] = result
                return TablesModelResult(result_expr._model_params, results)
        elif isinstance(src_expr, MetricsResultExpr):
            if not src_expr.executed:
                expr.tables = dict((pt.name, self._gen_table_name(src_expr)) for pt in src_expr.output_ports)
                gen_params = expr.convert_params(src_expr)

                ui = kw.pop('ui')
                progress_proportion = kw.pop('progress_proportion', 1)
                ui_group = kw.pop('group', None)
                engine_kw = getattr(src_expr, '_engine_kw', {})

                engine_kw['lifecycle'] = options.temp_lifecycle
                if hasattr(src_expr, '_cases'):
                    kw['_cases'] = src_expr._cases

                self._run(src_expr._algo, gen_params, src_expr.algo_meta, engine_kw, ui,
                          progress_proportion=progress_proportion, group=ui_group, **kw)

                src_expr.executed = True

            if options.ml.dry_run:
                return None
            else:
                if hasattr(src_expr, '_result_callback'):
                    callback = src_expr._result_callback
                else:
                    callback = lambda v: v
                return callback(expr.calculator(self._odps))
        else:
            temp_name = self._gen_table_name(src_expr)
            persist_kw = kwargs.copy()
            persist_kw['_table'] = temp_name
            expr.persist_kw = persist_kw

            ui = kw.pop('ui')
            progress_proportion = kw.pop('progress_proportion', 1)
            ui_group = kw.pop('group', None)

            kw['lifecycle'] = options.temp_lifecycle
            df = self._do_persist(expr_dag, src_expr, temp_name, ui=ui,
                                  progress_proportion=0.9 * progress_proportion, group=ui_group, **kw)
            return df.execute(ui=ui, progress_proportion=0.1 * progress_proportion, group=ui_group)