in odps/ml/engine.py [0:0]
def _handle_expr_persist(self, out_expr):
from ..df.backends.engine import ODPSSQLEngine
class ODPSEngine(ODPSSQLEngine):
def compile(self, expr, prettify=True, libraries=None):
expr = self._convert_table(expr)
expr_dag = expr.to_dag()
self._analyze(expr_dag, expr)
new_expr = self._rewrite(expr_dag)
sql = self._compile(new_expr, prettify=prettify, libraries=libraries)
if isinstance(sql, list):
return '\n'.join(sql)
return sql
if isinstance(out_expr, CollectionExpr):
partition = out_expr.persist_kw.get('partition')
partitions = out_expr.persist_kw.get('partitions')
drop_table = out_expr.persist_kw.get('drop_table', False)
create_table = out_expr.persist_kw.get('create_table', True)
drop_partition = out_expr.persist_kw.get('drop_partition', False)
create_partition = out_expr.persist_kw.get('create_partition', False)
overwrite = out_expr.persist_kw.get('overwrite', True)
cast = out_expr.persist_kw.get('cast', False)
expr_table = out_expr.persist_kw['_table']
expr_project = out_expr.persist_kw.get('_project')
expr_table_path = expr_table if expr_project is None else expr_project + '.' + expr_table
if partitions is None and partition is None:
if drop_table:
self._odps.delete_table(expr_table, project=expr_project, if_exists=True)
if self._odps.exist_table(expr_table):
temp_table_name = self._gen_table_name(out_expr)
out_expr.persist_kw['_table'] = temp_table_name
out_expr.persist_kw['_project'] = None
def callback():
t = self._odps.get_table(expr_table)
if t.table_schema.partitions:
raise CompileError('Cannot insert into partition table %s without specifying '
'`partition` or `partitions`.')
expr = self._odps.get_table(temp_table_name).to_df()
expr = self._reorder(expr, t, cast=cast)
sql = ODPSEngine(self._odps).compile(expr, prettify=False)
action_str = 'OVERWRITE' if overwrite else 'INTO'
return 'INSERT {0} TABLE {1} \n{2}'.format(action_str, expr_table_path, sql)
return callback
else:
return None
elif partition is not None:
temp_table_name = self._gen_table_name(out_expr)
out_expr.persist_kw['_table'] = temp_table_name
out_expr.persist_kw['_project'] = None
def callback():
t = self._odps.get_table(temp_table_name)
for col in out_expr.schema.columns:
if col.name.lower() not in t.table_schema:
raise CompileError('Column(%s) does not exist in target table %s, '
'writing cannot be performed.' % (col.name, t.name))
if drop_partition:
t.delete_partition(partition, if_exists=True)
if create_partition:
t.create_partition(partition, if_not_exists=True)
expr = t.to_df()
expr = self._reorder(expr, t, cast=cast)
sql = ODPSEngine(self._odps).compile(expr, prettify=False)
action_str = 'OVERWRITE' if overwrite else 'INTO'
return 'INSERT {0} TABLE {1} PARTITION({2}) {3}'.format(
action_str, expr_table_path, partition, sql,
)
return callback
else:
temp_table_name = self._gen_table_name(out_expr)
out_expr.persist_kw['_table'] = temp_table_name
out_expr.persist_kw['_project'] = None
if isinstance(partitions, tuple):
partitions = list(partitions)
if not isinstance(partitions, list):
partitions = [partitions, ]
def callback():
t = self._odps.get_table(temp_table_name)
schema = t.table_schema
columns = [c for c in schema.columns if c.name not in partitions]
ps = [Partition(name=pt, type=schema.get_type(pt)) for pt in partitions]
if drop_table:
self._odps.delete_table(expr_table, project=expr_project, if_exists=True)
if create_table:
lifecycle = options.temp_lifecycle if is_temp_table(expr_table) else options.lifecycle
self._odps.create_table(expr_table, TableSchema(columns=columns, partitions=ps),
project=expr_project, lifecycle=lifecycle)
expr = t.to_df()
expr = self._reorder(expr, t, cast=cast, with_partitions=True)
sql = ODPSEngine(self._odps).compile(expr, prettify=False)
action_str = 'OVERWRITE' if overwrite else 'INTO'
return 'INSERT {0} TABLE {1} PARTITION({2}) {3}'.format(
action_str, expr_table_path, ', '.join(partitions), sql,
)
return callback
elif isinstance(out_expr, ODPSModelExpr):
drop_model = out_expr.persist_kw.get('drop_model', False)
expr_model = out_expr.persist_kw['_model']
if drop_model:
if out_expr._is_offline_model:
self._odps.delete_offline_model(expr_model, if_exists=True)
else:
self._odps.delete_tables_model(expr_model, if_exists=True)