in odps/mars_extension/oscar/dataframe/datasource.py [0:0]
def _execute_arrow_tunnel(cls, ctx, op):
from odps import ODPS
from odps.tunnel import TableTunnel
out = op.outputs[0]
if op.table_name is None:
# is empty table
ctx[out.key] = cls._build_empty_df(out)
return
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
t = o.get_table(op.table_name)
schema_name = t.get_schema().name if t.get_schema() else None
tunnel = TableTunnel(o, project=t.project, quota_name=op.tunnel_quota_name)
retry_times = op.retry_times or options.retry_times
init_sleep_secs = 1
logger.info(
"Start creating download session for table %s(%s) start index %s end index %s retry_times %s.",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
retry_times,
)
retries = 0
while True:
try:
if op.partition_spec is not None:
download_session = tunnel.create_download_session(
t.name,
partition_spec=op.partition_spec,
schema=schema_name,
async_mode=True,
)
else:
download_session = tunnel.create_download_session(
t.name, async_mode=True
)
break
except:
if retries >= retry_times:
raise
retries += 1
sleep_secs = retries * init_sleep_secs
logger.exception(
"Create download session failed, sleep %s seconds and retry it",
sleep_secs,
exc_info=1,
)
time.sleep(sleep_secs)
logger.info(
"Start reading table %s(%s) split from %s to %s",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
)
if op.nrows is None:
count = op.end_index - op.start_index
else:
count = op.nrows
retries = 0
while True:
try:
columns = op.columns
if columns is not None:
columns = (op.index_columns or []) + op.columns
with download_session.open_arrow_reader(
op.start_index, count, columns=columns
) as reader:
table = reader.read()
break
except:
if retries >= retry_times:
raise
retries += 1
sleep_secs = retries * init_sleep_secs
logger.exception(
"Read table failed, sleep %s seconds and retry it",
sleep_secs,
exc_info=1,
)
time.sleep(sleep_secs)
table = cls._append_partition_values(table, op)
if op.string_as_binary:
table = cls._cast_string_to_binary(table)
data = arrow_table_to_pandas_dataframe(
table, use_arrow_dtype=op.use_arrow_dtype
)
if op.index_columns:
data = data.set_index(op.index_columns)
data = cls._align_output_data(op, data)
logger.info(
"Finish reading table %s(%s) split from %s to %s",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
)
ctx[op.outputs[0].key] = data