odps/models/readers.py (327 lines of code) (raw):
# Copyright 1999-2024 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..compat import six
from ..config import options
from ..errors import ODPSClientError
from ..lib.tblib import pickling_support
from ..readers import AbstractRecordReader
from ..utils import call_with_retry
pickling_support.install()
class TunnelReaderMixin(object):
@property
def count(self):
raise NotImplementedError
def _to_pandas_with_processes(
self, start=None, count=None, columns=None, append_partitions=None, n_process=1
):
import multiprocessing
from multiprocessing import Pipe
import pandas as pd
session_id = self._download_session.id
start = start or 0
count = count or self._download_session.count
count = min(count, self._download_session.count - start)
try:
_mp_context = multiprocessing.get_context("fork")
except ValueError:
_mp_context = multiprocessing.get_context("spawn")
except AttributeError:
# for py27 compatibility
_mp_context = multiprocessing
n_process = min(count, n_process)
split_count = count // n_process + (count % n_process != 0)
conns = []
for i in range(n_process):
parent_conn, child_conn = Pipe()
p = _mp_context.Process(
target=self._get_process_split_reader(
columns=columns, append_partitions=append_partitions
),
args=(child_conn, session_id, start, split_count, i),
)
p.start()
start += split_count
conns.append(parent_conn)
try:
results = [c.recv() for c in conns]
except EOFError:
six.raise_from(
ODPSClientError(
"Read process ended unexpectedly. Try finding errors outputed above."
),
None,
)
splits = sorted(results, key=lambda x: x[0])
if any(not d[2] for d in splits):
exc_info = next(d[1] for d in splits if not d[2])
six.reraise(*exc_info)
return pd.concat([d[1] for d in splits]).reset_index(drop=True)
def _get_process_split_reader(self, columns=None, append_partitions=None):
raise NotImplementedError
def _open_and_iter_reader(
self,
start,
record_count,
step=None,
compress=False,
columns=None,
append_partitions=None,
counter=None,
):
raise NotImplementedError
def iter_pandas(
self, batch_size=None, start=None, count=None, columns=None, **kwargs
):
batch_size = batch_size or options.tunnel.read_row_batch_size
start = start or 0
count = count or self.count
for st in range(start, start + count, batch_size):
cur_batch_size = min(batch_size, count - (st - start))
yield self.to_pandas(
start=st, count=cur_batch_size, columns=columns, **kwargs
)
class TunnelRecordReader(TunnelReaderMixin, AbstractRecordReader):
def __init__(self, parent, download_session, columns=None, append_partitions=None):
self._it = iter(self)
self._parent = parent
self._download_session = download_session
self._column_names = columns
self._append_partitions = append_partitions
@property
def download_id(self):
return self._download_session.id
@property
def count(self):
return self._download_session.count
@property
def status(self):
return self._download_session.status
def __iter__(self):
for record in self.read():
yield record
def __next__(self):
return next(self._it)
next = __next__
def _iter(
self,
start=None,
end=None,
step=None,
compress=False,
columns=None,
append_partitions=None,
):
count = self._calc_count(start, end, step)
return self.read(
start=start,
count=count,
step=step,
compress=compress,
columns=columns,
append_partitions=append_partitions,
)
def _open_and_iter_reader(
self,
start,
record_count,
step=None,
compress=False,
columns=None,
append_partitions=None,
counter=None,
):
counter = counter or [0]
with call_with_retry(
self._download_session.open_record_reader,
start,
record_count,
compress=compress,
columns=columns,
append_partitions=append_partitions,
) as reader:
for record in reader[::step]:
counter[0] += step
yield record
def read(
self,
start=None,
count=None,
step=None,
compress=False,
append_partitions=None,
columns=None,
):
start = start or 0
step = step or 1
max_rec_count = self.count - start
rec_count = (
min(max_rec_count, count * step) if count is not None else max_rec_count
)
columns = columns or self._column_names
append_partitions = (
append_partitions
if append_partitions is not None
else self._append_partitions
)
if rec_count == 0:
return
for record in self._open_and_iter_reader(
start,
rec_count,
step=step,
compress=compress,
append_partitions=append_partitions,
columns=columns,
):
yield record
def to_pandas(
self, start=None, count=None, columns=None, append_partitions=None, n_process=1
):
columns = columns or self._column_names
append_partitions = (
append_partitions
if append_partitions is not None
else self._append_partitions
)
if not append_partitions and columns is None:
columns = [c.name for c in self.schema.simple_columns]
if n_process == 1 or self._download_session.count == 0:
return super(TunnelRecordReader, self).to_pandas(
start=start,
count=count,
columns=columns,
append_partitions=append_partitions,
)
else:
return self._to_pandas_with_processes(
start=start,
count=count,
columns=columns,
append_partitions=append_partitions,
n_process=n_process,
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass
class TunnelArrowReader(TunnelReaderMixin):
def __init__(self, parent, download_session, columns=None, append_partitions=False):
self._it = iter(self)
self._parent = parent
self._download_session = download_session
self._column_names = columns
self._append_partitions = append_partitions
@property
def download_id(self):
return self._download_session.id
@property
def count(self):
return self._download_session.count
@property
def status(self):
return self._download_session.status
def __iter__(self):
for batch in self.read():
yield batch
def __next__(self):
return next(self._it)
next = __next__
def _open_and_iter_reader(
self,
start,
record_count,
step=None,
compress=False,
columns=None,
append_partitions=None,
counter=None,
):
counter = counter or [0]
with call_with_retry(
self._download_session.open_arrow_reader,
start,
record_count,
compress=compress,
columns=columns,
append_partitions=append_partitions,
) as reader:
while True:
batch = reader.read_next_batch()
if batch is not None:
counter[0] += batch.num_rows
yield batch
else:
break
def read(
self,
start=None,
count=None,
compress=False,
columns=None,
append_partitions=None,
):
start = start or 0
max_rec_count = self.count - start
rec_count = min(max_rec_count, count) if count is not None else max_rec_count
columns = columns or self._column_names
append_partitions = (
append_partitions
if append_partitions is not None
else self._append_partitions
)
if rec_count == 0:
return
for batch in self._open_and_iter_reader(
start,
rec_count,
compress=compress,
columns=columns,
append_partitions=append_partitions,
):
yield batch
def read_all(self, start=None, count=None, columns=None, append_partitions=None):
start = start or 0
count = count if count is not None else self.count - start
columns = columns or self._column_names
append_partitions = (
append_partitions
if append_partitions is not None
else self._append_partitions
)
if count == 0:
from ..tunnel.io.types import odps_schema_to_arrow_schema
arrow_schema = odps_schema_to_arrow_schema(self.schema)
return arrow_schema.empty_table()
with self._download_session.open_arrow_reader(
start, count, columns=columns, append_partitions=append_partitions
) as reader:
return reader.read()
def to_pandas(
self, start=None, count=None, columns=None, append_partitions=None, n_process=1
):
start = start or 0
count = count if count is not None else self.count - start
columns = columns or self._column_names
append_partitions = (
append_partitions
if append_partitions is not None
else self._append_partitions
)
if n_process == 1:
if count == 0:
from ..tunnel.io.types import odps_schema_to_arrow_schema
arrow_schema = odps_schema_to_arrow_schema(self.schema)
return arrow_schema.empty_table().to_pandas()
with self._download_session.open_arrow_reader(
start, count, columns=columns, append_partitions=append_partitions
) as reader:
return reader.to_pandas()
else:
return self._to_pandas_with_processes(
start=start,
count=count,
columns=columns,
append_partitions=append_partitions,
n_process=n_process,
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
pass