in odps/df/backends/sqlalchemy/engine.py [0:0]
def _do_persist(self, expr_dag, expr, name, partitions=None, partition=None,
odps=None, project=None, ui=None,
progress_proportion=1, execute_percent=0.5, lifecycle=None,
overwrite=True, drop_table=False, create_table=True,
drop_partition=False, create_partition=False, cast=False, **kwargs):
expr_dag = self._convert_table(expr_dag)
self._rewrite(expr_dag)
src_expr = expr
expr = expr_dag.root
odps = odps or self._odps
try:
import pandas
except ImportError:
raise DependencyNotInstalledError('persist requires for pandas')
df = self._do_execute(expr_dag, src_expr, ui=ui,
progress_proportion=progress_proportion * execute_percent, **kwargs)
schema = TableSchema(columns=df.columns)
if partitions is not None:
if drop_partition:
raise ValueError('Cannot drop partitions when specify `partitions`')
if create_partition:
raise ValueError('Cannot create partitions when specify `partitions`')
if isinstance(partitions, tuple):
partitions = list(partitions)
if not isinstance(partitions, list):
partitions = [partitions, ]
for p in partitions:
if p not in schema:
raise ValueError(
'Partition field(%s) does not exist in DataFrame schema' % p)
schema = df_schema_to_odps_schema(schema)
columns = [c for c in schema.columns if c.name not in partitions]
ps = [Partition(name=t, type=schema.get_type(t)) for t in partitions]
schema = TableSchema(columns=columns, partitions=ps)
if odps.exist_table(name, project=project) or not create_table:
t = odps.get_table(name, project=project)
else:
t = odps.create_table(name, schema, project=project)
elif partition is not None:
if odps.exist_table(name, project=project) or not create_table:
t = odps.get_table(name, project=project)
partition = self._get_partition(partition, t)
if drop_partition:
t.delete_partition(partition, if_exists=True)
if create_partition:
t.create_partition(partition, if_not_exists=True)
else:
partition = self._get_partition(partition)
project_obj = odps.get_project(project)
column_names = [n for n in expr.schema.names if n not in partition]
column_types = [
df_type_to_odps_type(expr.schema[n].type, project=project_obj)
for n in column_names
]
partition_names = [n for n in partition.keys]
partition_types = ['string'] * len(partition_names)
t = odps.create_table(
name, TableSchema.from_lists(
column_names, column_types, partition_names, partition_types
),
project=project,
)
if create_partition is None or create_partition is True:
t.create_partition(partition)
else:
if drop_partition:
raise ValueError('Cannot drop partition for non-partition table')
if create_partition:
raise ValueError('Cannot create partition for non-partition table')
if odps.exist_table(name, project=project) or not create_table:
t = odps.get_table(name, project=project)
if t.table_schema.partitions:
raise CompileError('Cannot insert into partition table %s without specifying '
'`partition` or `partitions`.')
else:
t = odps.create_table(name, df_schema_to_odps_schema(schema), project=project)
write_table(df, t, ui=ui, cast=cast, overwrite=overwrite, partitions=partitions, partition=partition,
progress_proportion=progress_proportion * (1 - execute_percent))
if partition:
filters = []
df = DataFrame(t)
for k in partition.keys:
filters.append(df[k] == partition[k])
return df.filter(*filters)
return DataFrame(t)