in odps/df/backends/odpssql/engine.py [0:0]
def _fetch(self, expr, src_expr, instance, ui, progress_proportion=1,
cache_data=None, head=None, tail=None, use_tunnel=True,
group=None, finish=True):
if isinstance(expr, (CollectionExpr, Summary)):
df_schema = expr._schema
schema = types.df_schema_to_odps_schema(expr._schema, ignorecase=True)
elif isinstance(expr, SequenceExpr):
df_schema = TableSchema.from_lists([expr.name], [expr._data_type])
schema = types.df_schema_to_odps_schema(df_schema, ignorecase=True)
else:
df_schema = None
schema = None
if cache_data is not None:
if group and finish:
ui.remove_keys(group)
if use_tunnel:
try:
if finish:
ui.status('Start to use tunnel to download results...')
with cache_data.open_reader(reopen=True) as reader:
if head:
reader = reader[:head]
elif tail:
start = max(reader.count - tail, 0)
reader = reader[start: ]
try:
return ResultFrame([r.values for r in reader], schema=df_schema)
finally:
context.cache(src_expr, cache_data)
# reset schema
if isinstance(src_expr, CollectionExpr) and \
(isinstance(src_expr._schema, DynamicSchema) or
any(isinstance(col.type, Unknown) for col in src_expr._schema.columns)):
src_expr._schema = df_schema
ui.inc(progress_proportion)
except ODPSError as ex:
# some project has closed the tunnel download
# we just ignore the error
warnings.warn('Failed to download data by table tunnel, 10000 records will be limited.\n' +
'Cause: ' + str(ex))
pass
if tail:
raise NotImplementedError
try:
if finish:
ui.status('Start to use head to download results...')
return ResultFrame(cache_data.head(head or 10000), schema=df_schema)
finally:
context.cache(src_expr, cache_data)
ui.inc(progress_proportion)
with instance.open_reader(schema=schema, use_tunnel=False) as reader:
ui.status('Start to read instance results...')
if not isinstance(src_expr, Scalar):
if head:
reader = reader[:head]
elif tail:
start = max(reader.count - tail, 0)
reader = reader[start: ]
try:
return ResultFrame([r.values for r in reader], schema=df_schema)
finally:
context.cache(src_expr, cache_data)
ui.inc(progress_proportion)
else:
ui.inc(progress_proportion)
odps_type = types.df_type_to_odps_type(src_expr._value_type, project=instance.project)
res = types.odps_types.validate_value(reader[0][0], odps_type)
context.cache(src_expr, res)
return res