in odps/mars_extension/oscar/dataframe/datasource.py [0:0]
def _tile_cupid(cls, op):
import numpy as np
import pandas as pd
from mars.core.context import get_context
df = op.outputs[0]
split_size = df.extra_params.chunk_bytes or CHUNK_BYTES_LIMIT
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
mars_context = get_context()
if mars_context is not None:
worker_count = len(mars_context.get_worker_addresses())
else:
worker_count = None
cupid_client = CupidServiceClient()
try:
parts = cupid_client.enum_table_partitions(
op.odps_params, op.table_name, op.partition
)
if parts is None:
parts = [None]
out_chunks = []
chunk_idx = 0
for partition_spec in parts:
columns = op.columns
if columns is not None:
columns = (op.index_columns or []) + columns
splits, split_size = cupid_client.create_table_download_session(
op.odps_params,
op.table_name,
partition_spec,
columns,
worker_count,
split_size,
MAX_CHUNK_NUM,
op.with_split_meta_on_tile,
)
logger.info("%s table splits have been created.", str(len(splits)))
meta_chunk_rows = [split.meta_row_count for split in splits]
if np.isnan(out_shape[0]):
est_chunk_rows = meta_chunk_rows
else:
sp_file_sizes = np.array(
[sp.split_file_end - sp.split_file_start for sp in splits]
)
total_size = sp_file_sizes.sum()
ratio_chunk_rows = (
sp_file_sizes * out_shape[0] // total_size
).tolist()
est_chunk_rows = [
mr if mr is not None else rr
for mr, rr in zip(meta_chunk_rows, ratio_chunk_rows)
]
logger.warning("Estimated chunk rows: %r", est_chunk_rows)
if len(splits) == 0:
logger.info("Table %s has no data", op.table_name)
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
out_chunks.append(out_chunk)
chunk_idx += 1
else:
for idx, split in enumerate(splits):
chunk_op = DataFrameReadTableSplit(
cupid_handle=to_str(split.handle),
split_index=split.split_index,
split_file_start=split.split_file_start,
split_file_end=split.split_file_end,
schema_file_start=split.schema_file_start,
schema_file_end=split.schema_file_end,
index_type=op.index_type,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
string_as_binary=op.string_as_binary,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=est_chunk_rows[idx],
partition_spec=partition_spec,
append_partitions=op.append_partitions,
meta_raw_size=split.meta_raw_size,
nrows=meta_chunk_rows[idx] or op.nrows,
memory_scale=op.memory_scale,
index_columns=op.index_columns,
extra_params=op.extra_params,
)
# the chunk shape is unknown
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
chunk_idx += 1
out_chunks.append(out_chunk)
finally:
cupid_client.close()
if op.index_type == "incremental" and _NEED_STANDARDIZE:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)