in odps/df/backends/odpssql/tunnel.py [0:0]
def execute(self, expr, ui=None, progress_proportion=1,
head=None, tail=None, verify=False, update_progress_count=50):
if isinstance(expr, (ProjectCollectionExpr, Summary)) and \
len(expr.fields) == 1 and \
isinstance(expr.fields[0], Count):
expr = expr.fields[0]
columns, partitions, count = (None, ) * 3
if isinstance(expr, Count):
if isinstance(expr.input, Column):
input = expr.input.input
else:
input = expr.input
elif isinstance(expr, SliceCollectionExpr):
input = expr.input
else:
input = expr
if verify:
return self._can_propagate(input)
if not self._can_propagate(input):
return
while True:
if isinstance(input, FilterPartitionCollectionExpr):
partition_kv = self._filter_on_partition(input)
if not partition_kv:
return
partitions = self._to_partition_spec(partition_kv)
if not columns:
columns = self._projection_on_source(input)
break
else:
ret = self._filter_on_partition(input)
if ret:
partitions = self._to_partition_spec(ret)
input = input.input
continue
ret = self._projection_on_source(input)
if ret:
columns = ret
input = input.input
continue
break
table = next(expr.data_source())
partition, filter_all_partitions = None, True
if table.table_schema.partitions:
if partitions is not None:
partition = self._partition_prefix(
[p.name for p in table.table_schema.partitions], partitions)
if partition is None:
return
if len(table.table_schema.partitions) != len(partitions):
filter_all_partitions = False
else:
filter_all_partitions = False
if isinstance(expr, Count):
if not filter_all_partitions:
# if not filter all partitions, fall back to ODPS SQL to calculate count
return
try:
with _open_reader(table, partition=partition) as reader:
ui.inc(progress_proportion)
return reader.count
except ODPSError:
return
else:
logger.info(
'Try fetching data from tunnel. If it takes a long time, please try running your code '
'with distributed capabilities, see related section in '
'https://www.alibabacloud.com/help/en/maxcompute/use-cases/use-a-pyodps-node-to-download'
'-data-to-a-local-directory-for-processing-or-to-process-data-online for more details.'
)
ui.status('Try to download data with tunnel...', clear_keys=True)
if isinstance(expr, SliceCollectionExpr):
if expr.start:
raise ExpressionError('For ODPS backend, slice\'s start cannot be specified')
count = expr.stop
try:
data = []
start, size, step = None, None, None
if head is not None:
size = min(head, count) if count is not None else head
elif tail is not None:
if filter_all_partitions:
start = None if count is None else max(count - tail, 0)
size = tail if count is None else min(count, tail)
else:
# tail on multi partitions, just fall back to SQL
return
else:
size = count
fetch_partitions = [partition] if filter_all_partitions else \
(p.name for p in table.iterate_partitions(partition))
if tail is not None:
fetch_partitions = list(fetch_partitions)[::-1]
if size is None:
fetch_partitions = list(fetch_partitions)
cum = 0
last_percent = 0
for curr_part, partition in izip(itertools.count(1), fetch_partitions):
rest = size - cum if size is not None else None
finished = False
with _open_reader(table, partition=partition) as reader:
if tail is not None and start is None:
s = max(reader.count - tail, 0)
start = s if start is None else max(s, start)
unique_columns = list(OrderedDict.fromkeys(columns)) if columns is not None else None
for i, r in izip(itertools.count(1),
reader.read(start=start, count=rest, columns=unique_columns)):
if size is not None and cum > size - 1:
finished = True
break
cum += 1
if cum % update_progress_count == 0:
if size is not None:
p = float(cum) / size * progress_proportion
ui.inc(p - last_percent)
last_percent = p
else:
p = ((curr_part - 1) / len(fetch_partitions) +
float(i) / reader.count / len(fetch_partitions)) * progress_proportion
ui.inc(p - last_percent)
last_percent = p
if partition:
spec = PartitionSpec(partition) if not isinstance(partition, PartitionSpec) \
else partition
self._fill_back_partition_values(r, table, spec.kv)
if columns is None or len(unique_columns) == len(columns):
data.append(r.values)
else:
data.append([r[n] for n in columns])
if finished:
break
if last_percent < progress_proportion:
ui.inc(progress_proportion - last_percent)
return ResultFrame(data, schema=expr._schema)
except NoPermission:
raise
except ODPSError:
return