core/maxframe/dataframe/datasource/read_parquet.py (302 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from typing import Dict
from urllib.parse import urlparse
import numpy as np
import pandas as pd
try:
import pyarrow as pa
import pyarrow.parquet as pq
except ImportError:
pa = None
try:
import fastparquet
except ImportError:
fastparquet = None
from ... import opcodes
from ...config import options
from ...lib.filesystem import FileSystem, get_fs, glob, open_file
from ...serialization.serializables import (
AnyField,
BoolField,
DictField,
Int32Field,
Int64Field,
ListField,
StringField,
)
from ...utils import lazy_import
from ..arrays import ArrowStringDtype
from ..operators import OutputType
from ..utils import parse_index, to_arrow_dtypes
from .core import ColumnPruneSupportedDataSourceMixin, IncrementalIndexDatasource
PARQUET_MEMORY_SCALE = 15
STRING_FIELD_OVERHEAD = 50
cudf = lazy_import("cudf")
def check_engine(engine):
if engine == "auto":
if pa is not None:
return "pyarrow"
elif fastparquet is not None: # pragma: no cover
return "fastparquet"
else: # pragma: no cover
raise RuntimeError("Please install either pyarrow or fastparquet.")
elif engine == "pyarrow":
if pa is None: # pragma: no cover
raise RuntimeError("Please install pyarrow first.")
return engine
elif engine == "fastparquet":
if fastparquet is None: # pragma: no cover
raise RuntimeError("Please install fastparquet first.")
return engine
else: # pragma: no cover
raise RuntimeError("Unsupported engine {} to read parquet.".format(engine))
def get_engine(engine):
if engine == "pyarrow":
return ArrowEngine()
elif engine == "fastparquet":
return FastpaquetEngine()
else: # pragma: no cover
raise RuntimeError("Unsupported engine {}".format(engine))
class ParquetEngine:
def get_row_num(self, f):
raise NotImplementedError
def read_dtypes(self, f, **kwargs):
raise NotImplementedError
def read_to_pandas(
self, f, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
raise NotImplementedError
def read_group_to_pandas(
self, f, group_index, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
raise NotImplementedError
def read_partitioned_to_pandas(
self,
f,
partitions: Dict,
partition_keys: Dict,
columns=None,
nrows=None,
use_arrow_dtype=None,
**kwargs,
):
raw_df = self.read_to_pandas(
f, columns=columns, nrows=nrows, use_arrow_dtype=use_arrow_dtype, **kwargs
)
for col, value in partition_keys.items():
dictionary = partitions[col]
raw_df[col] = pd.Series(
value,
dtype=pd.CategoricalDtype(categories=dictionary.tolist()),
index=raw_df.index,
)
return raw_df
def read_partitioned_dtypes(self, fs: FileSystem, directory, storage_options):
# As ParquetDataset will iterate all files,
# here we just find one file to infer dtypes
current_path = directory
partition_cols = []
while fs.isdir(current_path):
_, dirs, files = next(fs.walk(current_path))
dirs = [d for d in dirs if not d.startswith(".")]
files = [f for f in files if not f.startswith(".")]
if len(files) == 0:
# directory as partition
partition_cols.append(dirs[0].split("=", 1)[0])
current_path = os.path.join(current_path, dirs[0])
elif len(dirs) == 0:
# parquet files in deepest directory
current_path = os.path.join(current_path, files[0])
else: # pragma: no cover
raise ValueError(
"Files and directories are mixed in an intermediate directory"
)
# current path is now a parquet file
with open_file(current_path, storage_options=storage_options) as f:
dtypes = self.read_dtypes(f)
for partition in partition_cols:
dtypes[partition] = pd.CategoricalDtype()
return dtypes
def _parse_prefix(path):
path_prefix = ""
if isinstance(path, str):
parsed_path = urlparse(path)
if parsed_path.scheme:
path_prefix = f"{parsed_path.scheme}://{parsed_path.netloc}"
return path_prefix
class ArrowEngine(ParquetEngine):
def get_row_num(self, f):
file = pq.ParquetFile(f)
return file.metadata.num_rows
def read_dtypes(self, f, **kwargs):
file = pq.ParquetFile(f)
return file.schema_arrow.empty_table().to_pandas().dtypes
@classmethod
def _table_to_pandas(cls, t, nrows=None, use_arrow_dtype=None):
if nrows is not None:
t = t.slice(0, nrows)
if use_arrow_dtype:
df = t.to_pandas(types_mapper={pa.string(): ArrowStringDtype()}.get)
else:
df = t.to_pandas()
return df
def read_to_pandas(
self, f, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
file = pq.ParquetFile(f)
t = file.read(columns=columns, **kwargs)
return self._table_to_pandas(t, nrows=nrows, use_arrow_dtype=use_arrow_dtype)
def read_group_to_pandas(
self, f, group_index, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
file = pq.ParquetFile(f)
t = file.read_row_group(group_index, columns=columns, **kwargs)
return self._table_to_pandas(t, nrows=nrows, use_arrow_dtype=use_arrow_dtype)
class FastpaquetEngine(ParquetEngine):
def get_row_num(self, f):
file = fastparquet.ParquetFile(f)
return file.count()
def read_dtypes(self, f, **kwargs):
file = fastparquet.ParquetFile(f)
dtypes_dict = file._dtypes()
return pd.Series(dict((c, dtypes_dict[c]) for c in file.columns))
def read_to_pandas(
self, f, columns=None, nrows=None, use_arrow_dtype=None, **kwargs
):
file = fastparquet.ParquetFile(f)
df = file.to_pandas(columns, **kwargs)
if nrows is not None:
df = df.head(nrows)
if use_arrow_dtype:
df = df.astype(to_arrow_dtypes(df.dtypes).to_dict())
return df
class CudfEngine:
@classmethod
def read_to_cudf(cls, file, columns: list = None, nrows: int = None, **kwargs):
df = cudf.read_parquet(file, columns=columns, **kwargs)
if nrows is not None:
df = df.head(nrows)
return df
def read_group_to_cudf(
self, file, group_index: int, columns: list = None, nrows: int = None, **kwargs
):
return self.read_to_cudf(
file, columns=columns, nrows=nrows, row_groups=group_index, **kwargs
)
@classmethod
def read_partitioned_to_cudf(
cls,
file,
partitions: Dict,
partition_keys: Dict,
columns=None,
nrows=None,
**kwargs,
):
# cudf will read entire partitions even if only one partition provided,
# so we just read with pyarrow and convert to cudf DataFrame
file = pq.ParquetFile(file)
t = file.read(columns=columns, **kwargs)
t = t.slice(0, nrows) if nrows is not None else t
t = pa.table(t.columns, names=t.column_names)
raw_df = cudf.DataFrame.from_arrow(t)
for col, value in partition_keys.items():
dictionary = partitions[col].tolist()
codes = cudf.core.column.as_column(
dictionary.index(value), length=len(raw_df)
)
raw_df[col] = cudf.core.column.build_categorical_column(
categories=dictionary,
codes=codes,
size=codes.size,
offset=codes.offset,
ordered=False,
)
return raw_df
class DataFrameReadParquet(
IncrementalIndexDatasource,
ColumnPruneSupportedDataSourceMixin,
):
_op_type_ = opcodes.READ_PARQUET
path = AnyField("path")
engine = StringField("engine")
columns = ListField("columns")
use_arrow_dtype = BoolField("use_arrow_dtype")
groups_as_chunks = BoolField("groups_as_chunks")
group_index = Int32Field("group_index")
read_kwargs = DictField("read_kwargs")
incremental_index = BoolField("incremental_index")
storage_options = DictField("storage_options")
is_partitioned = BoolField("is_partitioned")
merge_small_files = BoolField("merge_small_files")
merge_small_file_options = DictField("merge_small_file_options")
# for chunk
partitions = DictField("partitions", default=None)
partition_keys = DictField("partition_keys", default=None)
num_group_rows = Int64Field("num_group_rows", default=None)
# as read meta may be too time-consuming when number of files is large,
# thus we only read first file to get row number and raw file size
first_chunk_row_num = Int64Field("first_chunk_row_num")
first_chunk_raw_bytes = Int64Field("first_chunk_raw_bytes")
def get_columns(self):
return self.columns
def set_pruned_columns(self, columns, *, keep_order=None):
self.columns = columns
def __call__(self, index_value=None, columns_value=None, dtypes=None):
self._output_types = [OutputType.dataframe]
shape = (np.nan, len(dtypes))
return self.new_dataframe(
None,
shape,
dtypes=dtypes,
index_value=index_value,
columns_value=columns_value,
)
def read_parquet(
path,
engine: str = "auto",
columns: list = None,
groups_as_chunks: bool = False,
use_arrow_dtype: bool = None,
incremental_index: bool = False,
storage_options: dict = None,
memory_scale: int = None,
merge_small_files: bool = True,
merge_small_file_options: dict = None,
gpu: bool = None,
**kwargs,
):
"""
Load a parquet object from the file path, returning a DataFrame.
Parameters
----------
path : str, path object or file-like object
Any valid string path is acceptable. The string could be a URL.
For file URLs, a host is expected. A local file could be:
``file://localhost/path/to/table.parquet``.
A file URL can also be a path to a directory that contains multiple
partitioned parquet files. Both pyarrow and fastparquet support
paths to directories as well as file URLs. A directory path could be:
``file://localhost/path/to/tables``.
By file-like object, we refer to objects with a ``read()`` method,
such as a file handler (e.g. via builtin ``open`` function)
or ``StringIO``.
engine : {'auto', 'pyarrow', 'fastparquet'}, default 'auto'
Parquet library to use. The default behavior is to try 'pyarrow',
falling back to 'fastparquet' if 'pyarrow' is unavailable.
columns : list, default=None
If not None, only these columns will be read from the file.
groups_as_chunks : bool, default False
if True, each row group correspond to a chunk.
if False, each file correspond to a chunk.
Only available for 'pyarrow' engine.
incremental_index: bool, default False
If index_col not specified, ensure range index incremental,
gain a slightly better performance if setting False.
use_arrow_dtype: bool, default None
If True, use arrow dtype to store columns.
storage_options: dict, optional
Options for storage connection.
memory_scale: int, optional
Scale that real memory occupation divided with raw file size.
merge_small_files: bool, default True
Merge small files whose size is small.
merge_small_file_options: dict
Options for merging small files
**kwargs
Any additional kwargs are passed to the engine.
Returns
-------
MaxFrame DataFrame
"""
engine_type = check_engine(engine)
engine = get_engine(engine_type)
single_path = path[0] if isinstance(path, list) else path
fs = get_fs(single_path, storage_options)
is_partitioned = False
if fs.isdir(single_path):
paths = fs.ls(path)
if all(fs.isdir(p) for p in paths):
# If all are directories, it is read as a partitioned dataset.
dtypes = engine.read_partitioned_dtypes(fs, path, storage_options)
is_partitioned = True
else:
with fs.open(paths[0], mode="rb") as f:
dtypes = engine.read_dtypes(f)
else:
if not isinstance(path, list):
file_path = glob(path, storage_options=storage_options)[0]
else:
file_path = path[0]
with open_file(file_path, storage_options=storage_options) as f:
dtypes = engine.read_dtypes(f)
if columns:
dtypes = dtypes[columns]
if use_arrow_dtype is None:
use_arrow_dtype = options.dataframe.use_arrow_dtype
if use_arrow_dtype:
dtypes = to_arrow_dtypes(dtypes)
index_value = parse_index(pd.RangeIndex(-1))
columns_value = parse_index(dtypes.index, store_data=True)
op = DataFrameReadParquet(
path=path,
engine=engine_type,
columns=columns,
groups_as_chunks=groups_as_chunks,
use_arrow_dtype=use_arrow_dtype,
read_kwargs=kwargs,
incremental_index=incremental_index,
storage_options=storage_options,
is_partitioned=is_partitioned,
memory_scale=memory_scale,
merge_small_files=merge_small_files,
merge_small_file_options=merge_small_file_options,
gpu=gpu,
)
return op(index_value=index_value, columns_value=columns_value, dtypes=dtypes)