def _do_persist()

in odps/df/backends/odpssql/engine.py [0:0]


    def _do_persist(self, expr_dag, expr, name, partitions=None, partition=None, project=None, ui=None,
                    progress_proportion=1, lifecycle=None, hints=None, priority=None,
                    running_cluster=None, overwrite=True, drop_table=False, create_table=True,
                    drop_partition=False, create_partition=None, cast=False, schema=None, **kw):
        group = kw.get('group')
        libraries = kw.pop('libraries', None)
        image = kw.pop('image', None)

        if isinstance(name, Partition):
            partition = name.partition_spec
            name = name.table
        if isinstance(name, Table):
            table = name
            project = table.project.name
            if table.get_schema():
                schema = table.get_schema().name
            name = table.name

        lifecycle = options.temp_lifecycle if name.startswith(TEMP_TABLE_PREFIX) else lifecycle

        self._ctx.default_schema = schema or self._ctx.default_schema
        if self._odps.is_schema_namespace_enabled(hints):
            self._ctx.default_schema = self._ctx.default_schema or "default"

        expr_dag = self._convert_table(expr_dag)
        self._rewrite(expr_dag)

        src_expr = expr
        expr = expr_dag.root

        should_cache = False

        if drop_table:
            self._odps.delete_table(name, project=project, schema=schema, if_exists=True)

        if project is not None or schema is not None:
            project = project or self._ctx._odps.project
            schema = schema or self._ctx.default_schema
        if project is None:
            table_name = name
        elif schema is None:
            table_name = '%s.`%s`' % (project, name)
        else:
            table_name = '%s.%s.`%s`' % (project, schema, name)

        project_obj = self._odps.get_project(project)
        if partitions is None and partition is None:
            # the non-partitioned table
            if drop_partition:
                raise ValueError('Cannot drop partition for non-partition table')
            if create_partition:
                raise ValueError('Cannot create partition for non-partition table')

            if self._odps.exist_table(name, project=project, schema=schema) or not create_table:
                t = self._odps.get_table(name, project=project, schema=schema)
                if t.table_schema.partitions:
                    raise CompileError('Cannot insert into partition table %s without specifying '
                                       '`partition` or `partitions`.')
                expr = self._reorder(expr, t, cast=cast)
            else:
                # We don't use `CREATE TABLE ... AS` because it will report `table already exists`
                # when service retries.
                if isinstance(expr, CollectionExpr):
                    t_schema = types.df_schema_to_odps_schema(expr.schema, ignorecase=True)
                else:
                    col_name = expr.name
                    tp = types.df_type_to_odps_type(expr.dtype, project=project_obj)
                    t_schema = TableSchema.from_lists([col_name, ], [tp, ])
                self._odps.create_table(name, TableSchema(columns=t_schema.columns),
                                        project=project, schema=schema, lifecycle=lifecycle)

            sql = self._compile(expr, prettify=False, libraries=libraries)
            action_str = 'OVERWRITE' if overwrite else 'INTO'
            format_sql = lambda s: 'INSERT {0} TABLE {1} \n{2}'.format(action_str, table_name, s)
            if isinstance(sql, list):
                sql[-1] = format_sql(sql[-1])
            else:
                sql = format_sql(sql)

            should_cache = True
        elif partition is not None:
            if self._odps.exist_table(name, project=project, schema=schema) or not create_table:
                t = self._odps.get_table(name, project=project, schema=schema)
                partition = self._get_partition(partition, t)

                if drop_partition:
                    t.delete_partition(partition, if_exists=True)
                if create_partition:
                    t.create_partition(partition, if_not_exists=True)
            else:
                partition = self._get_partition(partition)
                column_names = [n for n in expr.schema.names if n not in partition]
                column_types = [
                    types.df_type_to_odps_type(expr.schema[n].type, project=project_obj)
                    for n in column_names
                ]
                partition_names = [n for n in partition.keys]
                partition_types = ['string'] * len(partition_names)
                t = self._odps.create_table(
                    name, TableSchema.from_lists(
                        column_names, column_types, partition_names, partition_types
                    ),
                    project=project, lifecycle=lifecycle, schema=schema)
                if create_partition is None or create_partition is True:
                    t.create_partition(partition)

            expr = self._reorder(expr, t, cast=cast)
            sql = self._compile(expr, prettify=False, libraries=libraries)

            action_str = 'OVERWRITE' if overwrite else 'INTO'
            format_sql = lambda s: 'INSERT {0} TABLE {1} PARTITION({2}) \n{3}'.format(
                action_str, table_name, partition, s
            )
            if isinstance(sql, list):
                sql[-1] = format_sql(sql[-1])
            else:
                sql = format_sql(sql)
        else:
            if isinstance(partitions, tuple):
                partitions = list(partitions)
            if not isinstance(partitions, list):
                partitions = [partitions, ]

            if isinstance(expr, CollectionExpr):
                t_schema = types.df_schema_to_odps_schema(expr.schema, ignorecase=True)
            else:
                col_name = expr.name
                tp = types.df_type_to_odps_type(expr.dtype, project=project_obj)
                t_schema = TableSchema.from_lists([col_name, ], [tp, ])

            for p in partitions:
                if p not in t_schema:
                    raise ValueError(
                        'Partition field(%s) does not exist in DataFrame schema' % p)

            columns = [c for c in t_schema.columns if c.name not in partitions]
            ps = [TableSchema.TablePartition(name=pt, type=t_schema.get_type(pt)) for pt in partitions]
            if self._odps.exist_table(name, project=project, schema=schema) or not create_table:
                t = self._odps.get_table(name, project=project, schema=schema)
            else:
                t = self._odps.create_table(name, TableSchema(columns=columns, partitions=ps),
                                            project=project, lifecycle=lifecycle, schema=schema)
            if drop_partition:
                raise ValueError('Cannot drop partitions when specify `partitions`')
            if create_partition:
                raise ValueError('Cannot create partitions when specify `partitions`')
            expr = expr[[c.name for c in expr.schema if c.name not in partitions] + partitions]

            expr = self._reorder(expr, t, cast=cast, with_partitions=True)
            sql = self._compile(expr, prettify=False, libraries=libraries)

            action_str = 'OVERWRITE' if overwrite else 'INTO'
            format_sql = lambda s: 'INSERT {0} TABLE {1} PARTITION({2}) \n{3}'.format(
                action_str, table_name, ', '.join(partitions), s
            )
            if isinstance(sql, list):
                sql[-1] = format_sql(sql[-1])
            else:
                sql = format_sql(sql)

        sql = self._join_sql(sql)

        logger.info('Sql compiled:\n' + sql)

        try:
            instance = self._run(sql, ui, progress_proportion=progress_proportion, hints=hints,
                                 priority=priority, running_cluster=running_cluster, group=group,
                                 libraries=libraries, image=image, schema=schema)
        except ParseError as ex:
            logger.error("Failed to run DF generated SQL: %s:\n%s", str(ex), sql)
            raise
        finally:
            self._ctx.close()  # clear udfs and resources generated

        t = self._odps.get_table(name, project=project, schema=schema)
        if should_cache and not is_source_collection(src_expr):
            # TODO: support cache partition
            context.cache(src_expr, t)
        if partition:
            filters = []
            df = DataFrame(t)
            for k in partition.keys:
                # actual type of partition and column type may mismatch
                filters.append(df[k] == Scalar(partition[k]).cast(df[k].dtype))
            res = df.filter(*filters)
        else:
            res = DataFrame(t)
        if kw.get('ret_instance', False) is True:
            return instance, res
        return res