in odps/ml/engine.py [0:0]
def _do_execute(self, expr_dag, src_expr, **kwargs):
expr = expr_dag.root
kwargs['_output_models_only'] = self._is_output_model_only(src_expr)
kw = kwargs.copy()
if isinstance(src_expr, ODPSModelExpr):
ui = kw.pop('ui')
progress_proportion = kw.pop('progress_proportion', 1)
download_progress = progress_proportion
ui_group = kw.pop('group', None)
if hasattr(src_expr, '_source_data'):
result_expr = src_expr
else:
if not context.is_cached(src_expr):
temp_name = self._gen_model_name(src_expr)
download_progress = 0.1 * progress_proportion
self._do_persist(expr_dag, src_expr, temp_name, ui=ui,
progress_proportion=0.9 * progress_proportion, group=ui_group, **kw)
result_expr = src_expr.get_cached(context.get_cached(src_expr))
if result_expr._is_offline_model:
from .expr.models.pmml import PmmlResult
from .runners import XFlowNodeRunner
model = result_expr._source_data
if not options.ml.use_model_transfer:
pmml = model.get_model()
return PmmlResult(pmml)
else:
volume_name = options.ml.model_volume
if not self._odps.exist_volume(volume_name):
self._odps.create_parted_volume(volume_name)
vol_part = hashlib.md5(utils.to_binary(model.name)).hexdigest()
tempobj.register_temp_volume_partition(self._odps, (volume_name, vol_part))
algo_params = {
'modelName': model.name,
'volumeName': volume_name,
'partition': vol_part,
'format': 'pmml'
}
runner = XFlowNodeRunner(self, 'modeltransfer', algo_params, {}, {},
ui=ui, progress_proportion=download_progress, group=ui_group)
runner.execute()
pmml = self._odps.open_volume_reader(volume_name, vol_part, model.name + '.xml').read()
self._odps.delete_volume_partition(volume_name, vol_part)
return PmmlResult(utils.to_str(pmml))
else:
from .expr.models.base import TablesModelResult
results = dict()
frac = 1.0 / len(result_expr._model_collections)
for key, item in six.iteritems(result_expr._model_collections):
result = item.execute(ui=ui, progress_proportion=frac * 0.1 * progress_proportion,
group=ui_group)
results[key] = result
return TablesModelResult(result_expr._model_params, results)
elif isinstance(src_expr, MetricsResultExpr):
if not src_expr.executed:
expr.tables = dict((pt.name, self._gen_table_name(src_expr)) for pt in src_expr.output_ports)
gen_params = expr.convert_params(src_expr)
ui = kw.pop('ui')
progress_proportion = kw.pop('progress_proportion', 1)
ui_group = kw.pop('group', None)
engine_kw = getattr(src_expr, '_engine_kw', {})
engine_kw['lifecycle'] = options.temp_lifecycle
if hasattr(src_expr, '_cases'):
kw['_cases'] = src_expr._cases
self._run(src_expr._algo, gen_params, src_expr.algo_meta, engine_kw, ui,
progress_proportion=progress_proportion, group=ui_group, **kw)
src_expr.executed = True
if options.ml.dry_run:
return None
else:
if hasattr(src_expr, '_result_callback'):
callback = src_expr._result_callback
else:
callback = lambda v: v
return callback(expr.calculator(self._odps))
else:
temp_name = self._gen_table_name(src_expr)
persist_kw = kwargs.copy()
persist_kw['_table'] = temp_name
expr.persist_kw = persist_kw
ui = kw.pop('ui')
progress_proportion = kw.pop('progress_proportion', 1)
ui_group = kw.pop('group', None)
kw['lifecycle'] = options.temp_lifecycle
df = self._do_persist(expr_dag, src_expr, temp_name, ui=ui,
progress_proportion=0.9 * progress_proportion, group=ui_group, **kw)
return df.execute(ui=ui, progress_proportion=0.1 * progress_proportion, group=ui_group)