in odps/mars_extension/oscar/dataframe/datasource.py [0:0]
def _tile_tunnel(cls, op):
from odps import ODPS
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
table_obj = o.get_table(op.table_name)
if not table_obj.table_schema.partitions:
data_srcs = [table_obj]
elif op.partition is not None and check_partition_exist(
table_obj, op.partition
):
data_srcs = [table_obj.get_partition(op.partition)]
else:
data_srcs = list(table_obj.partitions)
if op.partition is not None:
data_srcs = filter_partitions(o, data_srcs, op.partition)
out_chunks = []
row_nsplits = []
index_start = 0
df = op.outputs[0]
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
if len(data_srcs) == 0:
# no partitions are selected
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(0, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start, 0),
)
out_chunks.append(out_chunk)
else:
retry_times = op.retry_times or options.retry_times
for data_src in data_srcs:
data_store_size = data_src.size
retries = 0
while True:
try:
with data_src.open_reader() as reader:
record_count = reader.count
break
except:
if retries >= retry_times:
raise
retries += 1
time.sleep(1)
if data_store_size == 0:
# empty table
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(0, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start, 0),
)
out_chunks.append(out_chunk)
index_start += 1
continue
chunk_size = df.extra_params.chunk_size
partition_spec = (
str(data_src.partition_spec)
if getattr(data_src, "partition_spec", None)
else None
)
if chunk_size is None:
chunk_bytes = df.extra_params.chunk_bytes or CHUNK_BYTES_LIMIT
chunk_count = data_store_size // chunk_bytes + (
data_store_size % chunk_bytes != 0
)
chunk_size = ceildiv(record_count, chunk_count)
split_size = chunk_bytes
else:
chunk_count = ceildiv(record_count, chunk_size)
split_size = data_store_size // chunk_count
for i in range(chunk_count):
start_index = chunk_size * i
end_index = min(chunk_size * (i + 1), record_count)
row_size = end_index - start_index
chunk_op = DataFrameReadTableSplit(
table_name=op.table_name,
partition_spec=partition_spec,
start_index=start_index,
end_index=end_index,
nrows=op.nrows,
odps_params=op.odps_params,
columns=op.columns,
index_type=op.index_type,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=row_size,
append_partitions=op.append_partitions,
memory_scale=op.memory_scale,
retry_times=op.retry_times,
tunnel_quota_name=op.tunnel_quota_name,
index_columns=op.index_columns,
extra_params=op.extra_params,
)
index_value = parse_index(pd.RangeIndex(start_index, end_index))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(row_size, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start + i, 0),
)
row_nsplits.append(row_size)
out_chunks.append(out_chunk)
index_start += chunk_count
if op.index_type == "incremental" and _NEED_STANDARDIZE:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = (tuple(row_nsplits), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)