in odps/ml/engine.py [0:0]
def _do_persist(self, expr_dag, src_expr, name, partitions=None, partition=None, project=None,
drop_table=False, create_table=True, drop_partition=False, create_partition=False,
**kwargs):
from .runners import SQLNodeRunner
from .enums import PortType
expr = expr_dag.root
kwargs['_output_models_only'] = self._is_output_model_only(src_expr)
output_exprs = src_expr.outputs()
shared_kw = src_expr.shared_kw
shared_kw['required_outputs'] = dict()
if hasattr(src_expr, 'output_ports'):
for out_port in src_expr.output_ports:
if not out_port.required and out_port.name not in output_exprs:
continue
if out_port.name in output_exprs:
out_expr = output_exprs[out_port.name]
if not getattr(out_expr, 'persist_kw', None):
expr_name = self._gen_table_name(out_expr) if isinstance(out_expr, CollectionExpr) \
else self._gen_model_name(expr)
self._write_persist_kw(expr_name, out_expr, **kwargs)
else:
expr_name = self._gen_table_name(src_expr.node_name) if out_port.type == PortType.DATA \
else self._gen_model_name(src_expr.node_name)
shared_kw['required_outputs'][out_port.name] = expr_name
src_expr.shared_kw = shared_kw
kw = kwargs.copy()
ui = kw.pop('ui')
progress_proportion = kw.pop('progress_proportion', 1)
ui_group = kw.pop('group', None)
engine_kw = getattr(src_expr, '_engine_kw', None)
if kw.get('lifecycle'):
engine_kw['lifecycle'] = kw['lifecycle']
elif options.lifecycle:
engine_kw['lifecycle'] = options.lifecycle
if hasattr(src_expr, '_cases'):
kw['_cases'] = src_expr._cases
if not options.ml.dry_run:
self._build_output_tables(src_expr)
sql_callbacks = []
expr.wait_execution()
if not src_expr.executed:
for out_expr in six.itervalues(output_exprs):
callback = self._handle_expr_persist(out_expr)
if callback is not None:
sql_callbacks.append(callback)
gen_params = expr.convert_params(src_expr)
if not src_expr.executed:
prog_ratio = 1
sub_ratio = 0
if sql_callbacks:
prog_ratio = 0.8
sub_ratio = (1 - prog_ratio) * progress_proportion / len(sql_callbacks)
try:
self._run(src_expr._algo, gen_params, src_expr.algo_meta, engine_kw, ui,
progress_proportion=prog_ratio * progress_proportion, group=ui_group, **kw)
for cb in sql_callbacks:
sql = cb()
runner = SQLNodeRunner(self, 'SQL', dict(sql=sql), dict(), dict(), ui,
progress_proportion=sub_ratio, group=ui_group)
runner.execute()
finally:
src_expr.executed = True
if getattr(src_expr, 'is_extra_expr', False):
t = src_expr._table_callback(self._odps, src_expr)
context.cache(src_expr, t)
if options.ml.dry_run:
df = CollectionExpr(_source_data=t, _schema=src_expr._schema)
else:
df = DataFrame(t)
df._ml_fields = src_expr._ml_fields
return df
ret = None
for out_name, out_expr in six.iteritems(output_exprs):
r = self._cache_expr_result(out_expr)
if out_name == src_expr._output_name:
ret = r
return ret