def execute()

in odps/df/backends/odpssql/tunnel.py [0:0]


    def execute(self, expr, ui=None, progress_proportion=1,
                head=None, tail=None, verify=False, update_progress_count=50):
        if isinstance(expr, (ProjectCollectionExpr, Summary)) and \
                len(expr.fields) == 1 and \
                isinstance(expr.fields[0], Count):
            expr = expr.fields[0]

        columns, partitions, count = (None, ) * 3
        if isinstance(expr, Count):
            if isinstance(expr.input, Column):
                input = expr.input.input
            else:
                input = expr.input
        elif isinstance(expr, SliceCollectionExpr):
            input = expr.input
        else:
            input = expr

        if verify:
            return self._can_propagate(input)
        if not self._can_propagate(input):
            return

        while True:
            if isinstance(input, FilterPartitionCollectionExpr):
                partition_kv = self._filter_on_partition(input)
                if not partition_kv:
                    return
                partitions = self._to_partition_spec(partition_kv)
                if not columns:
                    columns = self._projection_on_source(input)
                break
            else:
                ret = self._filter_on_partition(input)
                if ret:
                    partitions = self._to_partition_spec(ret)
                    input = input.input
                    continue

                ret = self._projection_on_source(input)
                if ret:
                    columns = ret
                    input = input.input
                    continue
                break

        table = next(expr.data_source())
        partition, filter_all_partitions = None, True
        if table.table_schema.partitions:
            if partitions is not None:
                partition = self._partition_prefix(
                    [p.name for p in table.table_schema.partitions], partitions)
                if partition is None:
                    return
                if len(table.table_schema.partitions) != len(partitions):
                    filter_all_partitions = False
            else:
                filter_all_partitions = False

        if isinstance(expr, Count):
            if not filter_all_partitions:
                # if not filter all partitions, fall back to ODPS SQL to calculate count
                return
            try:
                with _open_reader(table, partition=partition) as reader:
                    ui.inc(progress_proportion)
                    return reader.count
            except ODPSError:
                return
        else:
            logger.info(
                'Try fetching data from tunnel. If it takes a long time, please try running your code '
                'with distributed capabilities, see related section in '
                'https://www.alibabacloud.com/help/en/maxcompute/use-cases/use-a-pyodps-node-to-download'
                '-data-to-a-local-directory-for-processing-or-to-process-data-online for more details.'
            )
            ui.status('Try to download data with tunnel...', clear_keys=True)
            if isinstance(expr, SliceCollectionExpr):
                if expr.start:
                    raise ExpressionError('For ODPS backend, slice\'s start cannot be specified')
                count = expr.stop
            try:
                data = []

                start, size, step = None, None, None
                if head is not None:
                    size = min(head, count) if count is not None else head
                elif tail is not None:
                    if filter_all_partitions:
                        start = None if count is None else max(count - tail, 0)
                        size = tail if count is None else min(count, tail)
                    else:
                        # tail on multi partitions, just fall back to SQL
                        return
                else:
                    size = count

                fetch_partitions = [partition] if filter_all_partitions else \
                    (p.name for p in table.iterate_partitions(partition))
                if tail is not None:
                    fetch_partitions = list(fetch_partitions)[::-1]
                if size is None:
                    fetch_partitions = list(fetch_partitions)

                cum = 0
                last_percent = 0
                for curr_part, partition in izip(itertools.count(1), fetch_partitions):
                    rest = size - cum if size is not None else None
                    finished = False

                    with _open_reader(table, partition=partition) as reader:
                        if tail is not None and start is None:
                            s = max(reader.count - tail, 0)
                            start = s if start is None else max(s, start)

                        unique_columns = list(OrderedDict.fromkeys(columns)) if columns is not None else None
                        for i, r in izip(itertools.count(1),
                                         reader.read(start=start, count=rest, columns=unique_columns)):
                            if size is not None and cum > size - 1:
                                finished = True
                                break
                            cum += 1
                            if cum % update_progress_count == 0:
                                if size is not None:
                                    p = float(cum) / size * progress_proportion
                                    ui.inc(p - last_percent)
                                    last_percent = p
                                else:
                                    p = ((curr_part - 1) / len(fetch_partitions) +
                                         float(i) / reader.count / len(fetch_partitions)) * progress_proportion
                                    ui.inc(p - last_percent)
                                    last_percent = p
                            if partition:
                                spec = PartitionSpec(partition) if not isinstance(partition, PartitionSpec) \
                                    else partition
                                self._fill_back_partition_values(r, table, spec.kv)
                            if columns is None or len(unique_columns) == len(columns):
                                data.append(r.values)
                            else:
                                data.append([r[n] for n in columns])

                    if finished:
                        break

                if last_percent < progress_proportion:
                    ui.inc(progress_proportion - last_percent)
                return ResultFrame(data, schema=expr._schema)
            except NoPermission:
                raise
            except ODPSError:
                return