in odps/mars_extension/oscar/dataframe/datastore.py [0:0]
def _execute_arrow_tunnel(cls, ctx, op):
from odps import ODPS
from odps.tunnel import TableTunnel
import pyarrow as pa
import pandas as pd
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
t = o.get_table(op.table_name)
tunnel = TableTunnel(o, project=t.project, quota_name=op.tunnel_quota_name)
retry_times = options.retry_times
init_sleep_secs = 1
split_index = op.inputs[0].index
logger.info(
"Start creating upload session for table %s split index %s retry_times %s.",
op.table_name,
split_index,
retry_times,
)
retries = 0
while True:
try:
if op.partition_spec is not None:
upload_session = tunnel.create_upload_session(
t.name, partition_spec=op.partition_spec
)
else:
upload_session = tunnel.create_upload_session(t.name)
break
except:
if retries >= retry_times:
raise
retries += 1
sleep_secs = retries * init_sleep_secs
logger.exception(
"Create upload session failed, sleep %s seconds and retry it",
sleep_secs,
exc_info=1,
)
time.sleep(sleep_secs)
logger.info(
"Start writing table %s. split_index: %s tunnel_session: %s",
op.table_name,
split_index,
upload_session.id,
)
retries = 0
while True:
try:
writer = upload_session.open_arrow_writer(0)
arrow_rb = pa.RecordBatch.from_pandas(ctx[op.inputs[0].key])
writer.write(arrow_rb)
writer.close()
break
except:
if retries >= retry_times:
raise
retries += 1
sleep_secs = retries * init_sleep_secs
logger.exception(
"Write data failed, sleep %s seconds and retry it",
sleep_secs,
exc_info=1,
)
time.sleep(sleep_secs)
recorder_name = op.commit_recorder_name
try:
recorder = ctx.get_remote_object(recorder_name)
except ActorNotExist:
while True:
logger.info(
"Writing table %s has been finished, waiting to be canceled by speculative scheduler",
op.table_name,
)
time.sleep(3)
can_commit, can_destroy = recorder.try_commit(split_index)
if can_commit:
# FIXME If this commit failed or the process crashed, the whole write will still raise error.
# But this situation is very rare so we skip the error handling.
logger.info(
"Committing to table %s with upload session %s", op.table_name, upload_session.id
)
upload_session.commit([0])
logger.info(
"Finish writing table %s. split_index: %s tunnel_session: %s",
op.table_name,
split_index,
upload_session.id,
)
else:
logger.info(
"Skip writing table %s. split_index: %s", op.table_name, split_index
)
if can_destroy:
try:
ctx.destroy_remote_object(recorder_name)
logger.info("Delete remote object %s", recorder_name)
except ActorNotExist:
pass
logger.info(
"Committing to table %s with upload session %s", op.table_name, upload_session.id
)
upload_session.commit([0])
logger.info(
"Finish writing table %s. split_index: %s tunnel_session: %s",
op.table_name,
split_index,
upload_session.id,
)
ctx[op.outputs[0].key] = pd.DataFrame()