in odps/mars_extension/legacy/dataframe/datastore.py [0:0]
def _tile_cupid(cls, op):
from odps import ODPS
from odps.accounts import BearerTokenAccount
from cupid import CupidSession, context
from cupid.runtime import RuntimeContext
if not RuntimeContext.is_context_ready():
raise SystemError(
"No Mars cluster found, please create via `o.create_mars_cluster`."
)
cupid_ctx = context()
bearer_token = cupid_ctx.get_bearer_token()
account = BearerTokenAccount(bearer_token)
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
None,
None,
account=account,
project=odps_params["project"],
endpoint=endpoint,
)
cupid_session = CupidSession(o)
data_src = o.get_table(op.table_name)
logger.debug("Start creating upload session from cupid.")
upload_session = cupid_session.create_upload_session(data_src)
input_df = build_concatenated_rows_frame(op.inputs[0])
out_df = op.outputs[0]
out_chunks = []
out_chunk_shape = (0,) * len(input_df.shape)
blocks = {}
for chunk in input_df.chunks:
block_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")
chunk_op = DataFrameWriteTableSplit(
dtypes=op.dtypes,
table_name=op.table_name,
unknown_as_string=op.unknown_as_string,
partition_spec=op.partition_spec,
cupid_handle=to_str(upload_session.handle),
block_id=block_id,
write_batch_size=op.write_batch_size,
)
out_chunk = chunk_op.new_chunk(
[chunk],
shape=out_chunk_shape,
index=chunk.index,
index_value=out_df.index_value,
dtypes=chunk.dtypes,
)
out_chunks.append(out_chunk)
blocks[block_id] = op.partition_spec
# build commit tree
combine_size = 8
chunks = out_chunks
while len(chunks) >= combine_size:
new_chunks = []
for i in range(0, len(chunks), combine_size):
chks = chunks[i : i + combine_size]
if len(chks) == 1:
chk = chks[0]
else:
chk_op = DataFrameWriteTableCommit(
dtypes=op.dtypes, is_terminal=False
)
chk = chk_op.new_chunk(
chks,
shape=out_chunk_shape,
index_value=out_df.index_value,
dtypes=op.dtypes,
)
new_chunks.append(chk)
chunks = new_chunks
assert len(chunks) < combine_size
commit_table_op = DataFrameWriteTableCommit(
dtypes=op.dtypes,
table_name=op.table_name,
blocks=blocks,
cupid_handle=to_str(upload_session.handle),
overwrite=op.overwrite,
odps_params=op.odps_params,
is_terminal=True,
)
commit_table_chunk = commit_table_op.new_chunk(
chunks,
shape=out_chunk_shape,
dtypes=op.dtypes,
index_value=out_df.index_value,
)
new_op = op.copy()
return new_op.new_dataframes(
op.inputs,
shape=out_df.shape,
index_value=out_df.index_value,
dtypes=out_df.dtypes,
columns_value=out_df.columns_value,
chunks=[commit_table_chunk],
nsplits=((0,),) * len(out_chunk_shape),
)