def _fetch()

in odps/df/backends/odpssql/engine.py [0:0]


    def _fetch(self, expr, src_expr, instance, ui, progress_proportion=1,
               cache_data=None, head=None, tail=None, use_tunnel=True,
               group=None, finish=True):
        if isinstance(expr, (CollectionExpr, Summary)):
            df_schema = expr._schema
            schema = types.df_schema_to_odps_schema(expr._schema, ignorecase=True)
        elif isinstance(expr, SequenceExpr):
            df_schema = TableSchema.from_lists([expr.name], [expr._data_type])
            schema = types.df_schema_to_odps_schema(df_schema, ignorecase=True)
        else:
            df_schema = None
            schema = None

        if cache_data is not None:
            if group and finish:
                ui.remove_keys(group)
            if use_tunnel:
                try:
                    if finish:
                        ui.status('Start to use tunnel to download results...')
                    with cache_data.open_reader(reopen=True) as reader:
                        if head:
                            reader = reader[:head]
                        elif tail:
                            start = max(reader.count - tail, 0)
                            reader = reader[start: ]
                        try:
                            return ResultFrame([r.values for r in reader], schema=df_schema)
                        finally:
                            context.cache(src_expr, cache_data)
                            # reset schema
                            if isinstance(src_expr, CollectionExpr) and \
                                    (isinstance(src_expr._schema, DynamicSchema) or
                                     any(isinstance(col.type, Unknown) for col in src_expr._schema.columns)):
                                src_expr._schema = df_schema
                            ui.inc(progress_proportion)
                except ODPSError as ex:
                    # some project has closed the tunnel download
                    # we just ignore the error
                    warnings.warn('Failed to download data by table tunnel, 10000 records will be limited.\n' +
                                  'Cause: ' + str(ex))
                    pass

            if tail:
                raise NotImplementedError

            try:
                if finish:
                    ui.status('Start to use head to download results...')
                return ResultFrame(cache_data.head(head or 10000), schema=df_schema)
            finally:
                context.cache(src_expr, cache_data)
                ui.inc(progress_proportion)

        with instance.open_reader(schema=schema, use_tunnel=False) as reader:
            ui.status('Start to read instance results...')
            if not isinstance(src_expr, Scalar):
                if head:
                    reader = reader[:head]
                elif tail:
                    start = max(reader.count - tail, 0)
                    reader = reader[start: ]
                try:
                    return ResultFrame([r.values for r in reader], schema=df_schema)
                finally:
                    context.cache(src_expr, cache_data)
                    ui.inc(progress_proportion)
            else:
                ui.inc(progress_proportion)
                odps_type = types.df_type_to_odps_type(src_expr._value_type, project=instance.project)
                res = types.odps_types.validate_value(reader[0][0], odps_type)
                context.cache(src_expr, res)
                return res