in odps/mars_extension/legacy/dataframe/datasource.py [0:0]
def _tile_tunnel(cls, op):
from odps import ODPS
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
table_obj = o.get_table(op.table_name)
if not table_obj.table_schema.partitions:
data_srcs = [table_obj]
elif op.partition is not None and check_partition_exist(
table_obj, op.partition
):
data_srcs = [table_obj.get_partition(op.partition)]
else:
data_srcs = list(table_obj.partitions)
if op.partition is not None:
data_srcs = filter_partitions(o, data_srcs, op.partition)
out_chunks = []
row_nsplits = []
index_start = 0
df = op.outputs[0]
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
for data_src in data_srcs:
data_store_size = data_src.size
shape = out_shape
chunk_size = df.extra_params.chunk_size
partition_spec = (
str(data_src.partition_spec)
if getattr(data_src, "partition_spec", None)
else None
)
if chunk_size is None:
chunk_bytes = df.extra_params.chunk_bytes or READ_CHUNK_LIMIT
chunk_count = data_store_size // chunk_bytes + (
data_store_size % chunk_bytes != 0
)
chunk_size = ceildiv(shape[0], chunk_count)
split_size = chunk_bytes
else:
chunk_count = ceildiv(shape[0], chunk_size)
split_size = data_store_size // chunk_count
for i in range(chunk_count):
start_index = chunk_size * i
end_index = min(chunk_size * (i + 1), shape[0])
row_size = end_index - start_index
chunk_op = DataFrameReadTableSplit(
table_name=op.table_name,
partition_spec=partition_spec,
start_index=start_index,
end_index=end_index,
nrows=op.nrows,
odps_params=op.odps_params,
columns=op.columns,
add_offset=op.add_offset,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=row_size,
append_partitions=op.append_partitions,
memory_scale=op.memory_scale,
index_columns=op.index_columns,
)
index_value = parse_index(pd.RangeIndex(start_index, end_index))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(row_size, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start + i, 0),
)
row_nsplits.append(row_size)
out_chunks.append(out_chunk)
index_start += chunk_count
if op.add_offset:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = (tuple(row_nsplits), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)