odps/mars_extension/oscar/dataframe/datasource.py (771 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import warnings
import time
import numpy as np
import pandas as pd
from mars.utils import parse_readable_size, ceildiv
from mars.serialization.serializables import (
StringField,
Int64Field,
ListField,
SeriesField,
DictField,
BoolField,
AnyField,
FieldTypes,
)
from mars.dataframe import ArrowStringDtype
from mars.dataframe.core import DATAFRAME_CHUNK_TYPE
from mars.dataframe.utils import (
parse_index,
standardize_range_index,
arrow_table_to_pandas_dataframe,
)
from mars.dataframe.datasource.core import ColumnPruneSupportedDataSourceMixin
from mars.core import OutputType
from ....config import options
from ....df.backends.odpssql.types import odps_type_to_df_type
from ....utils import to_str, to_timestamp
from ...utils import check_partition_exist, filter_partitions
from ..cupid_service import CupidServiceClient
logger = logging.getLogger(__name__)
CHUNK_BYTES_LIMIT = 64 * 1024**2
MAX_CHUNK_NUM = 512 * 1024**2
ORC_COMPRESSION_RATIO = 5
STRING_FIELD_OVERHEAD = 50
try:
from mars.dataframe.datasource.core import (
IncrementalIndexDatasource,
IncrementalIndexDataSourceMixin,
)
_BASE = (
IncrementalIndexDatasource,
ColumnPruneSupportedDataSourceMixin,
IncrementalIndexDataSourceMixin,
)
_NEED_STANDARDIZE = False
except ImportError:
from mars.dataframe.datasource.core import HeadOptimizedDataSource
_BASE = (HeadOptimizedDataSource, ColumnPruneSupportedDataSourceMixin)
_NEED_STANDARDIZE = True
class DataFrameReadTable(*_BASE):
_op_type_ = 123450
odps_params = DictField("odps_params")
table_name = StringField("table_name")
partition_spec = StringField("partition_spec", default=None)
dtypes = SeriesField("dtypes", default=None)
index_type = StringField("index_type", default=None)
columns = AnyField("columns", default=None)
nrows = Int64Field("nrows", default=None)
use_arrow_dtype = BoolField("use_arrow_dtype", default=None)
string_as_binary = BoolField("string_as_binary", default=None)
append_partitions = BoolField("append_partitions", default=None)
last_modified_time = Int64Field("last_modified_time", default=None)
with_split_meta_on_tile = BoolField("with_split_meta_on_tile", default=None)
retry_times = Int64Field("retry_times", default=None)
tunnel_quota_name = StringField("tunnel_quota_name", default=None)
index_columns = ListField("index_columns", FieldTypes.string, default=None)
index_dtypes = SeriesField("index_dtypes", default=None)
def __init__(self, sparse=None, memory_scale=None, **kw):
super(DataFrameReadTable, self).__init__(
sparse=sparse,
memory_scale=memory_scale,
_output_types=[OutputType.dataframe],
**kw
)
@property
def retryable(self):
if "CUPID_SERVICE_SOCKET" in os.environ:
return False
else:
return True
@property
def partition(self):
return getattr(self, "partition_spec", None)
@property
def incremental_index(self):
return self.index_type == "incremental"
def get_columns(self):
return self.columns
def set_pruned_columns(self, columns, *, keep_order=None): # pragma: no cover
self.columns = columns
def __call__(self, shape, chunk_bytes=None, chunk_size=None):
import numpy as np
import pandas as pd
if not self.index_columns:
if np.isnan(shape[0]):
index_value = parse_index(pd.RangeIndex(0))
else:
index_value = parse_index(pd.RangeIndex(shape[0]))
elif len(self.index_columns) == 1:
index_value = parse_index(pd.Index([]).astype(self.index_dtypes[0]))
else:
idx = pd.MultiIndex.from_frame(
pd.DataFrame([], columns=self.index_columns).astype(self.index_dtypes)
)
index_value = parse_index(idx)
columns_value = parse_index(self.dtypes.index, store_data=True)
return self.new_dataframe(
None,
shape,
dtypes=self.dtypes,
index_value=index_value,
columns_value=columns_value,
chunk_bytes=chunk_bytes,
chunk_size=chunk_size,
)
@classmethod
def _tile_cupid(cls, op):
import numpy as np
import pandas as pd
from mars.core.context import get_context
df = op.outputs[0]
split_size = df.extra_params.chunk_bytes or CHUNK_BYTES_LIMIT
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
mars_context = get_context()
if mars_context is not None:
worker_count = len(mars_context.get_worker_addresses())
else:
worker_count = None
cupid_client = CupidServiceClient()
try:
parts = cupid_client.enum_table_partitions(
op.odps_params, op.table_name, op.partition
)
if parts is None:
parts = [None]
out_chunks = []
chunk_idx = 0
for partition_spec in parts:
columns = op.columns
if columns is not None:
columns = (op.index_columns or []) + columns
splits, split_size = cupid_client.create_table_download_session(
op.odps_params,
op.table_name,
partition_spec,
columns,
worker_count,
split_size,
MAX_CHUNK_NUM,
op.with_split_meta_on_tile,
)
logger.info("%s table splits have been created.", str(len(splits)))
meta_chunk_rows = [split.meta_row_count for split in splits]
if np.isnan(out_shape[0]):
est_chunk_rows = meta_chunk_rows
else:
sp_file_sizes = np.array(
[sp.split_file_end - sp.split_file_start for sp in splits]
)
total_size = sp_file_sizes.sum()
ratio_chunk_rows = (
sp_file_sizes * out_shape[0] // total_size
).tolist()
est_chunk_rows = [
mr if mr is not None else rr
for mr, rr in zip(meta_chunk_rows, ratio_chunk_rows)
]
logger.warning("Estimated chunk rows: %r", est_chunk_rows)
if len(splits) == 0:
logger.info("Table %s has no data", op.table_name)
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
out_chunks.append(out_chunk)
chunk_idx += 1
else:
for idx, split in enumerate(splits):
chunk_op = DataFrameReadTableSplit(
cupid_handle=to_str(split.handle),
split_index=split.split_index,
split_file_start=split.split_file_start,
split_file_end=split.split_file_end,
schema_file_start=split.schema_file_start,
schema_file_end=split.schema_file_end,
index_type=op.index_type,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
string_as_binary=op.string_as_binary,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=est_chunk_rows[idx],
partition_spec=partition_spec,
append_partitions=op.append_partitions,
meta_raw_size=split.meta_raw_size,
nrows=meta_chunk_rows[idx] or op.nrows,
memory_scale=op.memory_scale,
index_columns=op.index_columns,
extra_params=op.extra_params,
)
# the chunk shape is unknown
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
chunk_idx += 1
out_chunks.append(out_chunk)
finally:
cupid_client.close()
if op.index_type == "incremental" and _NEED_STANDARDIZE:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile_tunnel(cls, op):
from odps import ODPS
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
table_obj = o.get_table(op.table_name)
if not table_obj.table_schema.partitions:
data_srcs = [table_obj]
elif op.partition is not None and check_partition_exist(
table_obj, op.partition
):
data_srcs = [table_obj.get_partition(op.partition)]
else:
data_srcs = list(table_obj.partitions)
if op.partition is not None:
data_srcs = filter_partitions(o, data_srcs, op.partition)
out_chunks = []
row_nsplits = []
index_start = 0
df = op.outputs[0]
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
if len(data_srcs) == 0:
# no partitions are selected
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(0, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start, 0),
)
out_chunks.append(out_chunk)
else:
retry_times = op.retry_times or options.retry_times
for data_src in data_srcs:
data_store_size = data_src.size
retries = 0
while True:
try:
with data_src.open_reader() as reader:
record_count = reader.count
break
except:
if retries >= retry_times:
raise
retries += 1
time.sleep(1)
if data_store_size == 0:
# empty table
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(0, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start, 0),
)
out_chunks.append(out_chunk)
index_start += 1
continue
chunk_size = df.extra_params.chunk_size
partition_spec = (
str(data_src.partition_spec)
if getattr(data_src, "partition_spec", None)
else None
)
if chunk_size is None:
chunk_bytes = df.extra_params.chunk_bytes or CHUNK_BYTES_LIMIT
chunk_count = data_store_size // chunk_bytes + (
data_store_size % chunk_bytes != 0
)
chunk_size = ceildiv(record_count, chunk_count)
split_size = chunk_bytes
else:
chunk_count = ceildiv(record_count, chunk_size)
split_size = data_store_size // chunk_count
for i in range(chunk_count):
start_index = chunk_size * i
end_index = min(chunk_size * (i + 1), record_count)
row_size = end_index - start_index
chunk_op = DataFrameReadTableSplit(
table_name=op.table_name,
partition_spec=partition_spec,
start_index=start_index,
end_index=end_index,
nrows=op.nrows,
odps_params=op.odps_params,
columns=op.columns,
index_type=op.index_type,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=row_size,
append_partitions=op.append_partitions,
memory_scale=op.memory_scale,
retry_times=op.retry_times,
tunnel_quota_name=op.tunnel_quota_name,
index_columns=op.index_columns,
extra_params=op.extra_params,
)
index_value = parse_index(pd.RangeIndex(start_index, end_index))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(row_size, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start + i, 0),
)
row_nsplits.append(row_size)
out_chunks.append(out_chunk)
index_start += chunk_count
if op.index_type == "incremental" and _NEED_STANDARDIZE:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = (tuple(row_nsplits), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile(cls, op):
if "CUPID_SERVICE_SOCKET" in os.environ:
return cls._tile_cupid(op)
else:
return cls._tile_tunnel(op)
class DataFrameReadTableSplit(*_BASE):
_op_type_ = 123451
# for cupid
cupid_handle = StringField("cupid_handle", default=None)
split_index = Int64Field("split_index", default=None)
split_file_start = Int64Field("split_file_start", default=None)
split_file_end = Int64Field("split_file_end", default=None)
schema_file_start = Int64Field("schema_file_start", default=None)
schema_file_end = Int64Field("schema_file_end", default=None)
use_arrow_dtype = BoolField("use_arrow_dtype", default=None)
string_as_binary = BoolField("string_as_binary", default=None)
dtypes = SeriesField("dtypes", default=None)
nrows = Int64Field("nrows", default=None)
index_type = StringField("index_type", default=None)
# for tunnel
table_name = StringField("table_name", default=None)
partition_spec = StringField("partition_spec", default=None)
start_index = Int64Field("start_index", default=None)
end_index = Int64Field("end_index", default=None)
odps_params = DictField("odps_params", default=None)
columns = AnyField("columns", default=None)
tunnel_quota_name = StringField("tunnel_quota_name", default=None)
split_size = Int64Field("split_size", default=None)
append_partitions = BoolField("append_partitions", default=None)
estimate_rows = Int64Field("estimate_rows", default=None)
meta_raw_size = Int64Field("meta_raw_size", default=None)
retry_times = Int64Field("retry_times", default=None)
index_columns = ListField("index_columns", FieldTypes.string, default=None)
def __init__(self, memory_scale=None, sparse=None, **kw):
super(DataFrameReadTableSplit, self).__init__(
sparse=sparse,
memory_scale=memory_scale,
_output_types=[OutputType.dataframe],
**kw
)
@property
def retryable(self):
if "CUPID_SERVICE_SOCKET" in os.environ:
return False
else:
return True
@property
def output_limit(self):
return 1
@property
def incremental_index(self):
return self.index_type == "incremental"
def set_pruned_columns(self, columns, *, keep_order=None): # pragma: no cover
if isinstance(columns, str):
columns = [columns]
self.columns = list(columns)
def get_columns(self):
return self.columns
@classmethod
def estimate_size(cls, ctx, op):
import numpy as np
def is_object_dtype(dtype):
try:
return (
np.issubdtype(dtype, np.object_)
or np.issubdtype(dtype, np.unicode_)
or np.issubdtype(dtype, np.bytes_)
)
except TypeError: # pragma: no cover
return False
if op.split_size is None:
ctx[op.outputs[0].key] = (0, 0)
return
arrow_size = (op.memory_scale or ORC_COMPRESSION_RATIO) * op.split_size
if op.meta_raw_size is not None:
raw_arrow_size = (op.memory_scale or 1) * op.meta_raw_size
arrow_size = max(arrow_size, raw_arrow_size)
n_strings = len([dt for dt in op.dtypes if is_object_dtype(dt)])
if op.estimate_rows or op.nrows:
rows = op.nrows if op.nrows is not None else op.estimate_rows
pd_size = arrow_size + n_strings * rows * STRING_FIELD_OVERHEAD
logger.info("Estimate pandas memory cost: %r", pd_size)
else:
pd_size = arrow_size * 10 if n_strings else arrow_size
ctx[op.outputs[0].key] = (pd_size, pd_size + arrow_size)
@classmethod
def _cast_string_to_binary(cls, arrow_table):
import pyarrow as pa
new_schema = []
for field in arrow_table.schema:
if field.type == pa.string():
new_schema.append(pa.field(field.name, pa.binary()))
else:
new_schema.append(field)
return arrow_table.cast(pa.schema(new_schema))
@classmethod
def _append_partition_values(cls, arrow_table, op):
import pyarrow as pa
if op.append_partitions and op.partition_spec:
from odps.types import PartitionSpec
spec = PartitionSpec(op.partition_spec)
for col_name, pt_val in spec.items():
arrow_table = arrow_table.append_column(
col_name, pa.array([pt_val] * arrow_table.num_rows, pa.string())
)
return arrow_table
@staticmethod
def _align_columns(data, expected_dtypes):
data_columns = data.dtypes.index
expected_columns = expected_dtypes.index
if not data_columns.equals(expected_columns):
logger.info(
"Data columns differs from output columns, "
"data columns: %s, output columns: %s",
data_columns,
expected_columns,
)
data.columns = expected_columns[: len(data.columns)]
for extra_col in expected_columns[len(data.columns) :]:
data[extra_col] = pd.Series([], dtype=expected_dtypes[extra_col])
if not data.dtypes.index.equals(expected_columns):
data = data[expected_columns]
return data
@classmethod
def _align_output_data(cls, op, data):
if isinstance(op.outputs[0], DATAFRAME_CHUNK_TYPE):
dtypes = op.outputs[0].dtypes
data = cls._align_columns(data, dtypes)
else:
dtypes = pd.Series([op.outputs[0].dtype], index=[op.outputs[0].name])
data = cls._align_columns(data, dtypes)
data = data[op.outputs[0].name]
return data
@classmethod
def _build_empty_df(cls, out):
empty_df = pd.DataFrame()
for name, dtype in out.dtypes.items():
empty_df[name] = pd.Series(dtype=dtype)
return empty_df
@classmethod
def _execute_in_cupid(cls, ctx, op):
out = op.outputs[0]
if op.cupid_handle is None:
ctx[out.key] = cls._build_empty_df(out)
return
split_config = dict(
_handle=op.cupid_handle,
_split_index=op.split_index,
_split_file_start=op.split_file_start,
_split_file_end=op.split_file_end,
_schema_file_start=op.schema_file_start,
_schema_file_end=op.schema_file_end,
)
cupid_client = CupidServiceClient()
try:
pa_table = cupid_client.read_table_data(split_config, op.nrows)
finally:
cupid_client.close()
cupid_client = None
pa_table = cls._append_partition_values(pa_table, op)
if op.string_as_binary:
pa_table = cls._cast_string_to_binary(pa_table)
data = arrow_table_to_pandas_dataframe(
pa_table, use_arrow_dtype=op.use_arrow_dtype
)[: op.nrows]
if op.index_columns:
data = data.set_index(op.index_columns)
data = cls._align_output_data(op, data)
logger.info("Read split table finished, split index: %s", op.split_index)
logger.info("Split data shape is %s", data.shape)
ctx[out.key] = data
@classmethod
def _execute_arrow_tunnel(cls, ctx, op):
from odps import ODPS
from odps.tunnel import TableTunnel
out = op.outputs[0]
if op.table_name is None:
# is empty table
ctx[out.key] = cls._build_empty_df(out)
return
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
t = o.get_table(op.table_name)
schema_name = t.get_schema().name if t.get_schema() else None
tunnel = TableTunnel(o, project=t.project, quota_name=op.tunnel_quota_name)
retry_times = op.retry_times or options.retry_times
init_sleep_secs = 1
logger.info(
"Start creating download session for table %s(%s) start index %s end index %s retry_times %s.",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
retry_times,
)
retries = 0
while True:
try:
if op.partition_spec is not None:
download_session = tunnel.create_download_session(
t.name,
partition_spec=op.partition_spec,
schema=schema_name,
async_mode=True,
)
else:
download_session = tunnel.create_download_session(
t.name, async_mode=True
)
break
except:
if retries >= retry_times:
raise
retries += 1
sleep_secs = retries * init_sleep_secs
logger.exception(
"Create download session failed, sleep %s seconds and retry it",
sleep_secs,
exc_info=1,
)
time.sleep(sleep_secs)
logger.info(
"Start reading table %s(%s) split from %s to %s",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
)
if op.nrows is None:
count = op.end_index - op.start_index
else:
count = op.nrows
retries = 0
while True:
try:
columns = op.columns
if columns is not None:
columns = (op.index_columns or []) + op.columns
with download_session.open_arrow_reader(
op.start_index, count, columns=columns
) as reader:
table = reader.read()
break
except:
if retries >= retry_times:
raise
retries += 1
sleep_secs = retries * init_sleep_secs
logger.exception(
"Read table failed, sleep %s seconds and retry it",
sleep_secs,
exc_info=1,
)
time.sleep(sleep_secs)
table = cls._append_partition_values(table, op)
if op.string_as_binary:
table = cls._cast_string_to_binary(table)
data = arrow_table_to_pandas_dataframe(
table, use_arrow_dtype=op.use_arrow_dtype
)
if op.index_columns:
data = data.set_index(op.index_columns)
data = cls._align_output_data(op, data)
logger.info(
"Finish reading table %s(%s) split from %s to %s",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
)
ctx[op.outputs[0].key] = data
@classmethod
def execute(cls, ctx, op):
if "CUPID_SERVICE_SOCKET" in os.environ:
cls._execute_in_cupid(ctx, op)
else:
cls._execute_arrow_tunnel(ctx, op)
def df_type_to_np_type(df_type, use_arrow_dtype=False):
from ....df import types
from ....df.backends.pd.types import _df_to_np_types
if df_type == types.string:
if use_arrow_dtype:
return ArrowStringDtype()
else:
return np.dtype("object")
elif df_type in _df_to_np_types:
return _df_to_np_types[df_type]
elif df_type == types.timestamp:
return np.datetime64(0, "ns").dtype
else:
return np.dtype("object")
def read_odps_table(
table,
shape,
partition=None,
sparse=False,
chunk_bytes=None,
chunk_size=None,
columns=None,
odps_params=None,
add_offset=None,
use_arrow_dtype=False,
string_as_binary=None,
memory_scale=None,
append_partitions=False,
with_split_meta_on_tile=False,
index_type="incremental",
tunnel_quota_name=None,
index_columns=None,
extra_params=None,
):
import pandas as pd
if add_offset is not None:
warnings.warn(
"add_offset is deprecated, please use index_type instead",
DeprecationWarning,
)
if add_offset in (True, False):
index_type = "incremental" if add_offset else "chunk_incremental"
if isinstance(chunk_size, (list, tuple)):
if len(chunk_size) == 1:
chunk_size = chunk_size[0]
if len(chunk_size) > 1:
raise ValueError("Only support split on rows")
if chunk_bytes is not None:
chunk_bytes = int(parse_readable_size(chunk_bytes)[0])
cols = table.table_schema.columns if append_partitions else table.table_schema.simple_columns
table_columns = [c.name for c in cols]
table_types = [c.type for c in cols]
df_types = [
df_type_to_np_type(odps_type_to_df_type(type), use_arrow_dtype=use_arrow_dtype)
for type in table_types
]
if isinstance(index_columns, str):
index_columns = [index_columns]
if index_columns and columns is None:
index_col_set = set(index_columns)
columns = [c for c in table_columns if c not in index_col_set]
if not index_columns:
index_dtypes = None
else:
table_index_types = [
df_types[table_columns.index(col)] for col in index_columns
]
index_dtypes = pd.Series(table_index_types, index=index_columns)
if columns is not None:
table_col_set = set(columns)
if any(col in table_col_set for col in index_columns):
raise ValueError("Index columns and columns shall not overlap.")
# reorder columns
new_columns = [c for c in table_columns if c in columns]
df_types = [df_types[table_columns.index(col)] for col in new_columns]
table_columns = new_columns
columns = new_columns
dtypes = pd.Series(df_types, index=table_columns)
retry_times = options.retry_times
op = DataFrameReadTable(
odps_params=odps_params,
table_name=table.full_table_name,
partition_spec=partition,
dtypes=dtypes,
sparse=sparse,
index_type=index_type,
columns=columns,
use_arrow_dtype=use_arrow_dtype,
string_as_binary=string_as_binary,
memory_scale=memory_scale,
append_partitions=append_partitions,
last_modified_time=to_timestamp(table.last_data_modified_time),
with_split_meta_on_tile=with_split_meta_on_tile,
retry_times=retry_times,
tunnel_quota_name=tunnel_quota_name,
index_columns=index_columns,
index_dtypes=index_dtypes,
extra_params=extra_params or dict(),
)
return op(shape, chunk_bytes=chunk_bytes, chunk_size=chunk_size)