in odps/mars_extension/legacy/dataframe/datasource.py [0:0]
def _tile_cupid(cls, op):
from odps import ODPS
from odps.accounts import BearerTokenAccount
from cupid import CupidSession, context
from cupid.errors import CupidError
from mars.context import get_context
cupid_ctx = context()
bearer_token = cupid_ctx.get_bearer_token()
account = BearerTokenAccount(bearer_token)
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
None,
None,
account=account,
project=odps_params["project"],
endpoint=endpoint,
)
cupid_session = CupidSession(o)
mars_context = get_context()
df = op.outputs[0]
split_size = df.extra_params.chunk_bytes or READ_CHUNK_LIMIT
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
table_obj = o.get_table(op.table_name)
if not table_obj.table_schema.partitions:
data_srcs = [table_obj]
elif op.partition is not None and check_partition_exist(
table_obj, op.partition
):
data_srcs = [table_obj.get_partition(op.partition)]
else:
data_srcs = list(table_obj.partitions)
if op.partition is not None:
data_srcs = filter_partitions(o, data_srcs, op.partition)
out_chunks = []
chunk_idx = 0
for data_src in data_srcs:
try:
data_store_size = data_src.size
except ODPSError:
# fail to get data size, just ignore
pass
else:
if data_store_size < split_size and mars_context is not None:
# get worker counts
worker_count = max(len(mars_context.get_worker_addresses()), 1)
# data is too small, split as many as number of cores
split_size = data_store_size // worker_count
# at least 1M
split_size = max(split_size, 1 * 1024**2)
logger.debug(
"Input data size is too small, split_size is %s", split_size
)
logger.debug(
"Start creating download session of table %s from cupid, "
"columns: %s",
op.table_name,
op.columns,
)
while True:
try:
download_session = cupid_session.create_download_session(
data_src,
split_size=split_size,
columns=op.columns,
with_split_meta=op.with_split_meta_on_tile,
)
break
except CupidError:
logger.debug(
"The number of splits exceeds 100000, split_size is %s",
split_size,
)
if split_size >= MAX_CHUNK_SIZE:
raise
else:
split_size *= 2
logger.debug(
"%s table splits have been created.", str(len(download_session.splits))
)
meta_chunk_rows = [
split.meta_row_count for split in download_session.splits
]
if np.isnan(out_shape[0]):
est_chunk_rows = meta_chunk_rows
else:
sp_file_sizes = np.array(
[
sp.split_file_end - sp.split_file_start
for sp in download_session.splits
]
)
total_size = sp_file_sizes.sum()
ratio_chunk_rows = (sp_file_sizes * out_shape[0] // total_size).tolist()
est_chunk_rows = [
mr if mr is not None else rr
for mr, rr in zip(meta_chunk_rows, ratio_chunk_rows)
]
partition_spec = (
str(data_src.partition_spec)
if getattr(data_src, "partition_spec", None)
else None
)
logger.warning("Estimated chunk rows: %r", est_chunk_rows)
if len(download_session.splits) == 0:
logger.debug("Table %s has no data", op.table_name)
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
out_chunks.append(out_chunk)
chunk_idx += 1
else:
for idx, split in enumerate(download_session.splits):
chunk_op = DataFrameReadTableSplit(
cupid_handle=to_str(split.handle),
split_index=split.split_index,
split_file_start=split.split_file_start,
split_file_end=split.split_file_end,
schema_file_start=split.schema_file_start,
schema_file_end=split.schema_file_end,
add_offset=op.add_offset,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
string_as_binary=op.string_as_binary,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=est_chunk_rows[idx],
partition_spec=partition_spec,
append_partitions=op.append_partitions,
meta_raw_size=split.meta_raw_size,
nrows=meta_chunk_rows[idx] or op.nrows,
memory_scale=op.memory_scale,
index_columns=op.index_columns,
)
# the chunk shape is unknown
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
chunk_idx += 1
out_chunks.append(out_chunk)
if op.add_offset:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)