def _handle_expr_persist()

in odps/ml/engine.py [0:0]


    def _handle_expr_persist(self, out_expr):
        from ..df.backends.engine import ODPSSQLEngine

        class ODPSEngine(ODPSSQLEngine):
            def compile(self, expr, prettify=True, libraries=None):
                expr = self._convert_table(expr)
                expr_dag = expr.to_dag()
                self._analyze(expr_dag, expr)
                new_expr = self._rewrite(expr_dag)
                sql = self._compile(new_expr, prettify=prettify, libraries=libraries)
                if isinstance(sql, list):
                    return '\n'.join(sql)
                return sql

        if isinstance(out_expr, CollectionExpr):
            partition = out_expr.persist_kw.get('partition')
            partitions = out_expr.persist_kw.get('partitions')
            drop_table = out_expr.persist_kw.get('drop_table', False)
            create_table = out_expr.persist_kw.get('create_table', True)
            drop_partition = out_expr.persist_kw.get('drop_partition', False)
            create_partition = out_expr.persist_kw.get('create_partition', False)
            overwrite = out_expr.persist_kw.get('overwrite', True)
            cast = out_expr.persist_kw.get('cast', False)

            expr_table = out_expr.persist_kw['_table']
            expr_project = out_expr.persist_kw.get('_project')
            expr_table_path = expr_table if expr_project is None else expr_project + '.' + expr_table

            if partitions is None and partition is None:
                if drop_table:
                    self._odps.delete_table(expr_table, project=expr_project, if_exists=True)

                if self._odps.exist_table(expr_table):
                    temp_table_name = self._gen_table_name(out_expr)
                    out_expr.persist_kw['_table'] = temp_table_name
                    out_expr.persist_kw['_project'] = None

                    def callback():
                        t = self._odps.get_table(expr_table)
                        if t.table_schema.partitions:
                            raise CompileError('Cannot insert into partition table %s without specifying '
                                               '`partition` or `partitions`.')
                        expr = self._odps.get_table(temp_table_name).to_df()
                        expr = self._reorder(expr, t, cast=cast)

                        sql = ODPSEngine(self._odps).compile(expr, prettify=False)
                        action_str = 'OVERWRITE' if overwrite else 'INTO'
                        return 'INSERT {0} TABLE {1} \n{2}'.format(action_str, expr_table_path, sql)

                    return callback
                else:
                    return None
            elif partition is not None:
                temp_table_name = self._gen_table_name(out_expr)

                out_expr.persist_kw['_table'] = temp_table_name
                out_expr.persist_kw['_project'] = None

                def callback():
                    t = self._odps.get_table(temp_table_name)

                    for col in out_expr.schema.columns:
                        if col.name.lower() not in t.table_schema:
                            raise CompileError('Column(%s) does not exist in target table %s, '
                                               'writing cannot be performed.' % (col.name, t.name))

                    if drop_partition:
                        t.delete_partition(partition, if_exists=True)
                    if create_partition:
                        t.create_partition(partition, if_not_exists=True)

                    expr = t.to_df()
                    expr = self._reorder(expr, t, cast=cast)
                    sql = ODPSEngine(self._odps).compile(expr, prettify=False)
                    action_str = 'OVERWRITE' if overwrite else 'INTO'

                    return 'INSERT {0} TABLE {1} PARTITION({2}) {3}'.format(
                        action_str, expr_table_path, partition, sql,
                    )
                return callback
            else:
                temp_table_name = self._gen_table_name(out_expr)

                out_expr.persist_kw['_table'] = temp_table_name
                out_expr.persist_kw['_project'] = None

                if isinstance(partitions, tuple):
                    partitions = list(partitions)
                if not isinstance(partitions, list):
                    partitions = [partitions, ]

                def callback():
                    t = self._odps.get_table(temp_table_name)
                    schema = t.table_schema

                    columns = [c for c in schema.columns if c.name not in partitions]
                    ps = [Partition(name=pt, type=schema.get_type(pt)) for pt in partitions]
                    if drop_table:
                        self._odps.delete_table(expr_table, project=expr_project, if_exists=True)
                    if create_table:
                        lifecycle = options.temp_lifecycle if is_temp_table(expr_table) else options.lifecycle
                        self._odps.create_table(expr_table, TableSchema(columns=columns, partitions=ps),
                                                project=expr_project, lifecycle=lifecycle)

                    expr = t.to_df()
                    expr = self._reorder(expr, t, cast=cast, with_partitions=True)
                    sql = ODPSEngine(self._odps).compile(expr, prettify=False)
                    action_str = 'OVERWRITE' if overwrite else 'INTO'

                    return 'INSERT {0} TABLE {1} PARTITION({2}) {3}'.format(
                        action_str, expr_table_path, ', '.join(partitions), sql,
                    )
                return callback
        elif isinstance(out_expr, ODPSModelExpr):
            drop_model = out_expr.persist_kw.get('drop_model', False)
            expr_model = out_expr.persist_kw['_model']

            if drop_model:
                if out_expr._is_offline_model:
                    self._odps.delete_offline_model(expr_model, if_exists=True)
                else:
                    self._odps.delete_tables_model(expr_model, if_exists=True)