odps/mars_extension/legacy/dataframe/datasource.py (896 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import numpy as np
import pandas as pd
from mars.utils import parse_readable_size, ceildiv
from mars.serialize import (
StringField,
Int64Field,
SeriesField,
DictField,
BoolField,
ListField,
ValueType,
)
from mars.dataframe import ArrowStringDtype
from mars.dataframe.operands import DataFrameOperandMixin, DataFrameOperand
from mars.dataframe.utils import (
parse_index,
standardize_range_index,
arrow_table_to_pandas_dataframe,
)
from mars.optimizes.runtime.dataframe import DataSourceHeadRule
try:
from mars.dataframe.datasource.core import (
HeadOptimizedDataSource,
ColumnPruneSupportedDataSourceMixin,
)
BASES = (HeadOptimizedDataSource, ColumnPruneSupportedDataSourceMixin)
head_can_be_opt = True
except ImportError:
BASES = (DataFrameOperand, DataFrameOperandMixin)
head_can_be_opt = False
try:
from mars.core import OutputType
_output_type_kw = dict(_output_types=[OutputType.dataframe])
except ImportError:
from mars.dataframe.operands import ObjectType
_output_type_kw = dict(_object_type=ObjectType.dataframe)
from ....df.backends.odpssql.types import odps_type_to_df_type
from ....errors import ODPSError
from ....utils import to_str, to_timestamp
from ...utils import filter_partitions, check_partition_exist
logger = logging.getLogger(__name__)
READ_CHUNK_LIMIT = 64 * 1024**2
MAX_CHUNK_SIZE = 512 * 1024**2
ORC_COMPRESSION_RATIO = 5
STRING_FIELD_OVERHEAD = 50
_Base = type("_DataSource", BASES, dict())
class DataFrameReadTable(_Base):
_op_type_ = 123450
_odps_params = DictField("odps_params")
_table_name = StringField("table_name")
_partition_spec = StringField("partition_spec")
_dtypes = SeriesField("dtypes")
_add_offset = BoolField("add_offset")
_columns = ListField("columns")
_nrows = Int64Field("nrows")
_use_arrow_dtype = BoolField("use_arrow_dtype")
_string_as_binary = BoolField("string_as_binary")
_append_partitions = BoolField("append_partitions")
_last_modified_time = Int64Field("last_modified_time")
_with_split_meta_on_tile = BoolField("with_split_meta_on_tile")
_index_columns = ListField("index_columns", ValueType.string)
_index_dtypes = SeriesField("index_dtypes")
def __init__(
self,
odps_params=None,
table_name=None,
partition_spec=None,
columns=None,
dtypes=None,
nrows=None,
sparse=None,
add_offset=True,
use_arrow_dtype=None,
string_as_binary=None,
memory_scale=None,
append_partitions=None,
last_modified_time=None,
with_split_meta_on_tile=False,
index_columns=None,
index_dtypes=None,
**kw
):
kw.update(_output_type_kw)
super(DataFrameReadTable, self).__init__(
_odps_params=odps_params,
_table_name=table_name,
_partition_spec=partition_spec,
_columns=columns,
_dtypes=dtypes,
_nrows=nrows,
_sparse=sparse,
_use_arrow_dtype=use_arrow_dtype,
_string_as_binary=string_as_binary,
_add_offset=add_offset,
_append_partitions=append_partitions,
_last_modified_time=last_modified_time,
_memory_scale=memory_scale,
_with_split_meta_on_tile=with_split_meta_on_tile,
_index_columns=index_columns,
_index_dtypes=index_dtypes,
**kw
)
@property
def retryable(self):
return False
@property
def odps_params(self):
return self._odps_params
@property
def table_name(self):
return self._table_name
@property
def partition(self):
return getattr(self, "_partition_spec", None)
@property
def columns(self):
return self._columns
@property
def dtypes(self):
return self._dtypes
@property
def nrows(self):
return self._nrows
@property
def use_arrow_dtype(self):
return self._use_arrow_dtype
@property
def string_as_binary(self):
return self._string_as_binary
@property
def add_offset(self):
return self._add_offset
@property
def append_partitions(self):
return self._append_partitions
@property
def with_split_meta_on_tile(self):
return self._with_split_meta_on_tile
@property
def index_columns(self):
return self._index_columns
@property
def index_dtypes(self):
return self._index_dtypes
def get_columns(self):
return self._columns
def set_pruned_columns(self, columns):
self._columns = columns
def __call__(self, shape, chunk_bytes=None, chunk_size=None):
import numpy as np
import pandas as pd
if not self.index_columns:
if np.isnan(shape[0]):
index_value = parse_index(pd.RangeIndex(0))
else:
index_value = parse_index(pd.RangeIndex(shape[0]))
elif len(self.index_columns) == 1:
index_value = parse_index(pd.Index([]).astype(self.index_dtypes[0]))
else:
idx = pd.MultiIndex.from_frame(
pd.DataFrame([], columns=self.index_columns).astype(self.index_dtypes)
)
index_value = parse_index(idx)
columns_value = parse_index(self.dtypes.index, store_data=True)
return self.new_dataframe(
None,
shape,
dtypes=self.dtypes,
index_value=index_value,
columns_value=columns_value,
chunk_bytes=chunk_bytes,
chunk_size=chunk_size,
)
@classmethod
def _tile_cupid(cls, op):
from odps import ODPS
from odps.accounts import BearerTokenAccount
from cupid import CupidSession, context
from cupid.errors import CupidError
from mars.context import get_context
cupid_ctx = context()
bearer_token = cupid_ctx.get_bearer_token()
account = BearerTokenAccount(bearer_token)
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
None,
None,
account=account,
project=odps_params["project"],
endpoint=endpoint,
)
cupid_session = CupidSession(o)
mars_context = get_context()
df = op.outputs[0]
split_size = df.extra_params.chunk_bytes or READ_CHUNK_LIMIT
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
table_obj = o.get_table(op.table_name)
if not table_obj.table_schema.partitions:
data_srcs = [table_obj]
elif op.partition is not None and check_partition_exist(
table_obj, op.partition
):
data_srcs = [table_obj.get_partition(op.partition)]
else:
data_srcs = list(table_obj.partitions)
if op.partition is not None:
data_srcs = filter_partitions(o, data_srcs, op.partition)
out_chunks = []
chunk_idx = 0
for data_src in data_srcs:
try:
data_store_size = data_src.size
except ODPSError:
# fail to get data size, just ignore
pass
else:
if data_store_size < split_size and mars_context is not None:
# get worker counts
worker_count = max(len(mars_context.get_worker_addresses()), 1)
# data is too small, split as many as number of cores
split_size = data_store_size // worker_count
# at least 1M
split_size = max(split_size, 1 * 1024**2)
logger.debug(
"Input data size is too small, split_size is %s", split_size
)
logger.debug(
"Start creating download session of table %s from cupid, "
"columns: %s",
op.table_name,
op.columns,
)
while True:
try:
download_session = cupid_session.create_download_session(
data_src,
split_size=split_size,
columns=op.columns,
with_split_meta=op.with_split_meta_on_tile,
)
break
except CupidError:
logger.debug(
"The number of splits exceeds 100000, split_size is %s",
split_size,
)
if split_size >= MAX_CHUNK_SIZE:
raise
else:
split_size *= 2
logger.debug(
"%s table splits have been created.", str(len(download_session.splits))
)
meta_chunk_rows = [
split.meta_row_count for split in download_session.splits
]
if np.isnan(out_shape[0]):
est_chunk_rows = meta_chunk_rows
else:
sp_file_sizes = np.array(
[
sp.split_file_end - sp.split_file_start
for sp in download_session.splits
]
)
total_size = sp_file_sizes.sum()
ratio_chunk_rows = (sp_file_sizes * out_shape[0] // total_size).tolist()
est_chunk_rows = [
mr if mr is not None else rr
for mr, rr in zip(meta_chunk_rows, ratio_chunk_rows)
]
partition_spec = (
str(data_src.partition_spec)
if getattr(data_src, "partition_spec", None)
else None
)
logger.warning("Estimated chunk rows: %r", est_chunk_rows)
if len(download_session.splits) == 0:
logger.debug("Table %s has no data", op.table_name)
chunk_op = DataFrameReadTableSplit()
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=op.dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
out_chunks.append(out_chunk)
chunk_idx += 1
else:
for idx, split in enumerate(download_session.splits):
chunk_op = DataFrameReadTableSplit(
cupid_handle=to_str(split.handle),
split_index=split.split_index,
split_file_start=split.split_file_start,
split_file_end=split.split_file_end,
schema_file_start=split.schema_file_start,
schema_file_end=split.schema_file_end,
add_offset=op.add_offset,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
string_as_binary=op.string_as_binary,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=est_chunk_rows[idx],
partition_spec=partition_spec,
append_partitions=op.append_partitions,
meta_raw_size=split.meta_raw_size,
nrows=meta_chunk_rows[idx] or op.nrows,
memory_scale=op.memory_scale,
index_columns=op.index_columns,
)
# the chunk shape is unknown
index_value = parse_index(pd.RangeIndex(0))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(np.nan, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(chunk_idx, 0),
)
chunk_idx += 1
out_chunks.append(out_chunk)
if op.add_offset:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = ((np.nan,) * len(out_chunks), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile_tunnel(cls, op):
from odps import ODPS
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
table_obj = o.get_table(op.table_name)
if not table_obj.table_schema.partitions:
data_srcs = [table_obj]
elif op.partition is not None and check_partition_exist(
table_obj, op.partition
):
data_srcs = [table_obj.get_partition(op.partition)]
else:
data_srcs = list(table_obj.partitions)
if op.partition is not None:
data_srcs = filter_partitions(o, data_srcs, op.partition)
out_chunks = []
row_nsplits = []
index_start = 0
df = op.outputs[0]
out_dtypes = df.dtypes
out_shape = df.shape
out_columns_value = df.columns_value
if op.columns is not None:
out_dtypes = out_dtypes[op.columns]
out_shape = (df.shape[0], len(op.columns))
out_columns_value = parse_index(out_dtypes.index, store_data=True)
for data_src in data_srcs:
data_store_size = data_src.size
shape = out_shape
chunk_size = df.extra_params.chunk_size
partition_spec = (
str(data_src.partition_spec)
if getattr(data_src, "partition_spec", None)
else None
)
if chunk_size is None:
chunk_bytes = df.extra_params.chunk_bytes or READ_CHUNK_LIMIT
chunk_count = data_store_size // chunk_bytes + (
data_store_size % chunk_bytes != 0
)
chunk_size = ceildiv(shape[0], chunk_count)
split_size = chunk_bytes
else:
chunk_count = ceildiv(shape[0], chunk_size)
split_size = data_store_size // chunk_count
for i in range(chunk_count):
start_index = chunk_size * i
end_index = min(chunk_size * (i + 1), shape[0])
row_size = end_index - start_index
chunk_op = DataFrameReadTableSplit(
table_name=op.table_name,
partition_spec=partition_spec,
start_index=start_index,
end_index=end_index,
nrows=op.nrows,
odps_params=op.odps_params,
columns=op.columns,
add_offset=op.add_offset,
dtypes=out_dtypes,
sparse=op.sparse,
split_size=split_size,
use_arrow_dtype=op.use_arrow_dtype,
estimate_rows=row_size,
append_partitions=op.append_partitions,
memory_scale=op.memory_scale,
index_columns=op.index_columns,
)
index_value = parse_index(pd.RangeIndex(start_index, end_index))
columns_value = parse_index(out_dtypes.index, store_data=True)
out_chunk = chunk_op.new_chunk(
None,
shape=(row_size, out_shape[1]),
dtypes=out_dtypes,
index_value=index_value,
columns_value=columns_value,
index=(index_start + i, 0),
)
row_nsplits.append(row_size)
out_chunks.append(out_chunk)
index_start += chunk_count
if op.add_offset:
out_chunks = standardize_range_index(out_chunks)
new_op = op.copy()
nsplits = (tuple(row_nsplits), (out_shape[1],))
return new_op.new_dataframes(
None,
shape=out_shape,
dtypes=op.dtypes,
index_value=df.index_value,
columns_value=out_columns_value,
chunks=out_chunks,
nsplits=nsplits,
)
@classmethod
def _tile(cls, op):
from cupid.runtime import RuntimeContext
if RuntimeContext.is_context_ready():
return cls._tile_cupid(op)
else:
return cls._tile_tunnel(op)
if not head_can_be_opt:
tile = _tile
class DataFrameReadTableSplit(_Base):
_op_type_ = 123451
# for cupid
_cupid_handle = StringField("cupid_handle")
_split_index = Int64Field("split_index")
_split_file_start = Int64Field("split_file_start")
_split_file_end = Int64Field("split_file_end")
_schema_file_start = Int64Field("schema_file_start")
_schema_file_end = Int64Field("schema_file_end")
_use_arrow_dtype = BoolField("use_arrow_dtype")
_string_as_binary = BoolField("string_as_binary")
_dtypes = SeriesField("dtypes")
_nrows = Int64Field("nrows")
# for tunnel
_table_name = StringField("table_name")
_partition_spec = StringField("partition_spec")
_start_index = Int64Field("start_index")
_end_index = Int64Field("end_index")
_odps_params = DictField("odps_params")
_columns = ListField("columns")
_split_size = Int64Field("split_size")
_append_partitions = BoolField("append_partitions")
_estimate_rows = Int64Field("estimate_rows")
_meta_raw_size = Int64Field("meta_raw_size")
_index_columns = ListField("index_columns", ValueType.string)
def __init__(
self,
cupid_handle=None,
split_index=None,
split_file_start=None,
split_file_end=None,
schema_file_start=None,
schema_file_end=None,
table_name=None,
partition_spec=None,
start_index=None,
end_index=None,
odps_params=None,
columns=None,
nrows=None,
dtypes=None,
string_as_binary=None,
split_size=None,
use_arrow_dtype=None,
memory_scale=None,
estimate_rows=None,
meta_raw_size=None,
append_partitions=None,
sparse=None,
index_columns=None,
**kw
):
kw.update(_output_type_kw)
super(DataFrameReadTableSplit, self).__init__(
_cupid_handle=cupid_handle,
_split_index=split_index,
_split_file_start=split_file_start,
_split_file_end=split_file_end,
_schema_file_start=schema_file_start,
_schema_file_end=schema_file_end,
_table_name=table_name,
_partition_spec=partition_spec,
_columns=columns,
_start_index=start_index,
_end_index=end_index,
_odps_params=odps_params,
_use_arrow_dtype=use_arrow_dtype,
_string_as_binary=string_as_binary,
_nrows=nrows,
_estimate_rows=estimate_rows,
_split_size=split_size,
_dtypes=dtypes,
_append_partitions=append_partitions,
_sparse=sparse,
_meta_raw_size=meta_raw_size,
_memory_scale=memory_scale,
_index_columns=index_columns,
**kw
)
@property
def retryable(self):
return False
@property
def output_limit(self):
return 1
@property
def cupid_handle(self):
return self._cupid_handle
@property
def split_index(self):
return self._split_index
@property
def split_file_start(self):
return self._split_file_start
@property
def split_file_end(self):
return self._split_file_end
@property
def schema_file_start(self):
return self._schema_file_start
@property
def schema_file_end(self):
return self._schema_file_end
@property
def table_name(self):
return self._table_name
@property
def partition_spec(self):
return self._partition_spec
@property
def start_index(self):
return self._start_index
@property
def end_index(self):
return self._end_index
@property
def odps_params(self):
return self._odps_params
@property
def columns(self):
return self._columns
@property
def nrows(self):
return self._nrows
@property
def dtypes(self):
return self._dtypes
@property
def split_size(self):
return self._split_size
@property
def estimate_rows(self):
return self._estimate_rows
@property
def use_arrow_dtype(self):
return self._use_arrow_dtype
@property
def string_as_binary(self):
return self._string_as_binary
@property
def append_partitions(self):
return self._append_partitions
@property
def meta_raw_size(self):
return self._meta_raw_size
@property
def index_columns(self):
return self._index_columns
@classmethod
def estimate_size(cls, ctx, op):
import numpy as np
def is_object_dtype(dtype):
try:
return (
np.issubdtype(dtype, np.object_)
or np.issubdtype(dtype, np.unicode_)
or np.issubdtype(dtype, np.bytes_)
)
except TypeError: # pragma: no cover
return False
if op.split_size is None:
ctx[op.outputs[0].key] = (0, 0)
return
arrow_size = (op.memory_scale or ORC_COMPRESSION_RATIO) * op.split_size
if op.meta_raw_size is not None:
raw_arrow_size = (op.memory_scale or 1) * op.meta_raw_size
arrow_size = max(arrow_size, raw_arrow_size)
n_strings = len([dt for dt in op.dtypes if is_object_dtype(dt)])
if op.estimate_rows or op.nrows:
rows = op.nrows if op.nrows is not None else op.estimate_rows
pd_size = arrow_size + n_strings * rows * STRING_FIELD_OVERHEAD
logger.debug("Estimate pandas memory cost: %r", pd_size)
else:
pd_size = arrow_size * 10 if n_strings else arrow_size
ctx[op.outputs[0].key] = (pd_size, pd_size + arrow_size)
@classmethod
def _cast_string_to_binary(cls, arrow_table):
import pyarrow as pa
new_schema = []
for field in arrow_table.schema:
if field.type == pa.string():
new_schema.append(pa.field(field.name, pa.binary()))
else:
new_schema.append(field)
return arrow_table.cast(pa.schema(new_schema))
@classmethod
def _append_partition_values(cls, arrow_table, op):
import pyarrow as pa
if op.append_partitions and op.partition_spec:
from odps.types import PartitionSpec
spec = PartitionSpec(op.partition_spec)
for col_name, pt_val in spec.items():
arrow_table = arrow_table.append_column(
col_name, pa.array([pt_val] * arrow_table.num_rows, pa.string())
)
return arrow_table
@staticmethod
def _align_columns(data, expected_dtypes):
data_columns = data.dtypes.index
expected_columns = expected_dtypes.index
if not data_columns.equals(expected_columns):
logger.debug(
"Data columns differs from output columns, "
"data columns: %s, output columns: %s",
data_columns,
expected_columns,
)
data.columns = expected_columns[: len(data.columns)]
for extra_col in expected_columns[len(data.columns) :]:
data[extra_col] = pd.Series([], dtype=expected_dtypes[extra_col])
if not data.dtypes.index.equals(expected_columns):
data = data[expected_columns]
return data
@classmethod
def _execute_in_cupid(cls, ctx, op):
import pyarrow as pa
from cupid.io.table import TableSplit
out = op.outputs[0]
if op.cupid_handle is None:
empty_df = pd.DataFrame()
for name, dtype in out.dtypes.items():
empty_df[name] = pd.Series(dtype=dtype)
ctx[out.key] = empty_df
return
tsp = TableSplit(
_handle=op.cupid_handle,
_split_index=op.split_index,
_split_file_start=op.split_file_start,
_split_file_end=op.split_file_end,
_schema_file_start=op.schema_file_start,
_schema_file_end=op.schema_file_end,
)
logger.debug("Read split table, split index: %s", op.split_index)
reader = tsp.open_arrow_reader()
if op.nrows is None:
arrow_table = reader.read_all()
else:
nrows = 0
batches = []
while nrows < op.nrows:
try:
batch = reader.read_next_batch()
nrows += batch.num_rows
batches.append(batch)
except StopIteration:
break
logger.debug("Read %s rows of this split.", op.nrows)
arrow_table = pa.Table.from_batches(batches)
arrow_table = cls._append_partition_values(arrow_table, op)
if op.string_as_binary:
arrow_table = cls._cast_string_to_binary(arrow_table)
data = arrow_table_to_pandas_dataframe(
arrow_table, use_arrow_dtype=op.use_arrow_dtype
)
if op.index_columns:
data = data.set_index(op.index_columns)
if op.nrows is not None:
data = data[: op.nrows]
data = cls._align_columns(data, out.dtypes)
logger.debug("Read split table finished, split index: %s", op.split_index)
logger.debug(
"Split data shape is %s, size is %s",
data.shape,
data.memory_usage(deep=True).sum(),
)
ctx[out.key] = data
@classmethod
def _execute_arrow_tunnel(cls, ctx, op):
from odps import ODPS
from odps.tunnel import TableTunnel
project = os.environ.get("ODPS_PROJECT_NAME", None)
odps_params = op.odps_params.copy()
if project:
odps_params["project"] = project
endpoint = os.environ.get("ODPS_RUNTIME_ENDPOINT") or odps_params["endpoint"]
o = ODPS(
odps_params["access_id"],
odps_params["secret_access_key"],
project=odps_params["project"],
endpoint=endpoint,
)
t = o.get_table(op.table_name)
schema_name = t.get_schema().name if t.get_schema() is not None else None
tunnel = TableTunnel(o, project=t.project)
if op.partition_spec is not None:
download_session = tunnel.create_download_session(
t.name, partition_spec=op.partition_spec
)
else:
download_session = tunnel.create_download_session(t.name, schema=schema_name)
logger.debug(
"Start reading table %s(%s) split from %s to %s",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
)
if op.nrows is None:
count = op.end_index - op.start_index
else:
count = op.nrows
columns = op.columns
if columns is not None:
columns = (op.index_columns or []) + op.columns
with download_session.open_arrow_reader(
op.start_index, count, columns=columns
) as reader:
table = reader.read()
table = cls._append_partition_values(table, op)
if op.string_as_binary:
table = cls._cast_string_to_binary(table)
data = arrow_table_to_pandas_dataframe(
table, use_arrow_dtype=op.use_arrow_dtype
)
if op.index_columns:
data = data.set_index(op.index_columns)
data = cls._align_columns(data, op.outputs[0].dtypes)
logger.debug(
"Finish reading table %s(%s) split from %s to %s",
op.table_name,
op.partition_spec,
op.start_index,
op.end_index,
)
ctx[op.outputs[0].key] = data
@classmethod
def execute(cls, ctx, op):
from cupid.runtime import RuntimeContext
if RuntimeContext.is_context_ready():
cls._execute_in_cupid(ctx, op)
else:
cls._execute_arrow_tunnel(ctx, op)
def df_type_to_np_type(df_type, use_arrow_dtype=False):
from ....df import types
from ....df.backends.pd.types import _df_to_np_types
if df_type == types.string:
if use_arrow_dtype:
return ArrowStringDtype()
else:
return np.dtype("object")
elif df_type in _df_to_np_types:
return _df_to_np_types[df_type]
elif df_type == types.timestamp:
return np.datetime64(0, "ns").dtype
else:
return np.dtype("object")
def read_odps_table(
table,
shape,
partition=None,
sparse=False,
chunk_bytes=None,
chunk_size=None,
columns=None,
odps_params=None,
add_offset=False,
use_arrow_dtype=False,
string_as_binary=None,
memory_scale=None,
append_partitions=False,
with_split_meta_on_tile=False,
index_columns=None,
):
import pandas as pd
if isinstance(chunk_size, (list, tuple)):
if len(chunk_size) == 1:
chunk_size = chunk_size[0]
if len(chunk_size) > 1:
raise ValueError("Only support split on rows")
if chunk_bytes is not None:
chunk_bytes = int(parse_readable_size(chunk_bytes)[0])
cols = table.table_schema.columns if append_partitions else table.table_schema.simple_columns
table_columns = [c.name for c in cols]
table_types = [c.type for c in cols]
df_types = [
df_type_to_np_type(odps_type_to_df_type(type), use_arrow_dtype=use_arrow_dtype)
for type in table_types
]
if isinstance(index_columns, str):
index_columns = [index_columns]
if index_columns and columns is None:
index_col_set = set(index_columns)
columns = [c for c in table_columns if c not in index_col_set]
if not index_columns:
index_dtypes = None
else:
table_index_types = [
df_types[table_columns.index(col)] for col in index_columns
]
index_dtypes = pd.Series(table_index_types, index=index_columns)
if columns is not None:
table_col_set = set(columns)
if any(col in table_col_set for col in index_columns):
raise ValueError("Index columns and columns shall not overlap.")
# reorder columns
new_columns = [c for c in table_columns if c in columns]
df_types = [df_types[table_columns.index(col)] for col in new_columns]
table_columns = new_columns
columns = new_columns
dtypes = pd.Series(df_types, index=table_columns)
op = DataFrameReadTable(
odps_params=odps_params,
table_name=table.full_table_name,
partition_spec=partition,
dtypes=dtypes,
sparse=sparse,
add_offset=add_offset,
columns=columns,
use_arrow_dtype=use_arrow_dtype,
string_as_binary=string_as_binary,
memory_scale=memory_scale,
append_partitions=append_partitions,
last_modified_time=to_timestamp(table.last_data_modified_time),
with_split_meta_on_tile=with_split_meta_on_tile,
index_columns=index_columns,
index_dtypes=index_dtypes,
)
return op(shape, chunk_bytes=chunk_bytes, chunk_size=chunk_size)
class ReadODPSTableHeadRule(DataSourceHeadRule):
@staticmethod
def match(chunk, graph, keys):
from mars.dataframe.indexing.iloc import DataFrameIlocGetItem, SeriesIlocGetItem
op = chunk.op
inputs = graph.predecessors(chunk)
if (
len(inputs) == 1
and isinstance(op, (DataFrameIlocGetItem, SeriesIlocGetItem))
and isinstance(inputs[0].op, DataFrameReadTableSplit)
and inputs[0].key not in keys
):
try:
is_head = op.can_be_optimized()
except AttributeError:
is_head = op.is_head()
if is_head:
return True
else:
return False
return False