def _do_persist()

in odps/df/backends/pd/engine.py [0:0]


    def _do_persist(self, expr_dag, expr, name, ui=None, project=None,
                    partitions=None, partition=None, odps=None, lifecycle=None,
                    progress_proportion=1, execute_percent=0.5, overwrite=True,
                    drop_table=False, create_table=True, drop_partition=False,
                    create_partition=False, cast=False, schema=None, **kwargs):
        expr_dag = self._convert_table(expr_dag)
        self._rewrite(expr_dag)

        if isinstance(name, Partition):
            partition = name.partition_spec
            name = name.table
        if isinstance(name, Table):
            table = name
            project = table.project.name
            if table.get_schema():
                schema = table.get_schema().name
            name = table.name

        src_expr = expr
        expr = expr_dag.root
        odps = odps or self._odps
        if odps is None:
            raise ODPSError('ODPS entrance should be provided')
        schema = schema or odps.schema

        df = self._do_execute(expr_dag, src_expr, ui=ui,
                              progress_proportion=progress_proportion * execute_percent, **kwargs)
        t_schema = TableSchema(columns=df.columns)

        if drop_table:
            odps.delete_table(name, project=project, schema=schema, if_exists=True)

        if partitions is not None:
            if drop_partition:
                raise ValueError('Cannot drop partitions when specify `partitions`')
            if create_partition:
                raise ValueError('Cannot create partitions when specify `partitions`')

            if isinstance(partitions, tuple):
                partitions = list(partitions)
            if not isinstance(partitions, list):
                partitions = [partitions, ]

            for p in partitions:
                if p not in t_schema:
                    raise ValueError(
                            'Partition field(%s) does not exist in DataFrame schema' % p)

            t_schema = df_schema_to_odps_schema(t_schema)
            columns = [c for c in t_schema.columns if c.name not in partitions]
            ps = [TableSchemaPartition(name=t, type=t_schema.get_type(t)) for t in partitions]
            t_schema = TableSchema(columns=columns, partitions=ps)

            if odps.exist_table(name, project=project, schema=schema) or not create_table:
                t = odps.get_table(name, project=project, schema=schema)
            else:
                t = odps.create_table(name, t_schema, project=project, schema=schema, lifecycle=lifecycle)
        elif partition is not None:
            if odps.exist_table(name, project=project, schema=schema) or not create_table:
                t = odps.get_table(name, project=project, schema=schema)
                partition = self._get_partition(partition, t)

                if drop_partition:
                    t.delete_partition(partition, if_exists=True)
                if create_partition:
                    t.create_partition(partition, if_not_exists=True)
            else:
                partition = self._get_partition(partition)
                project_obj = odps.get_project(project)
                column_names = [n for n in expr.schema.names if n not in partition]
                column_types = [
                    df_type_to_odps_type(expr.schema[n].type, project=project_obj)
                    for n in column_names
                ]
                partition_names = [n for n in partition.keys]
                partition_types = ['string'] * len(partition_names)
                t = odps.create_table(
                    name, TableSchema.from_lists(
                        column_names, column_types, partition_names, partition_types
                    ),
                    project=project, lifecycle=lifecycle, schema=schema
                )
                if create_partition is None or create_partition is True:
                    t.create_partition(partition)
        else:
            if drop_partition:
                raise ValueError('Cannot drop partition for non-partition table')
            if create_partition:
                raise ValueError('Cannot create partition for non-partition table')

            if odps.exist_table(name, project=project, schema=schema) or not create_table:
                t = odps.get_table(name, project=project, schema=schema)
                if t.table_schema.partitions:
                    raise CompileError('Cannot insert into partition table %s without specifying '
                                       '`partition` or `partitions`.')
            else:
                t = odps.create_table(
                    name,
                    df_schema_to_odps_schema(t_schema),
                    project=project,
                    lifecycle=lifecycle,
                    schema=schema,
                )

        write_table(df, t, ui=ui, cast=cast, overwrite=overwrite, partitions=partitions, partition=partition,
                    progress_proportion=progress_proportion*(1-execute_percent))

        if partition:
            if partition:
                filters = []
                df = DataFrame(t)
                for k in partition.keys:
                    filters.append(df[k] == partition[k])
                return df.filter(*filters)
        return DataFrame(t)