python/datafusion/io.py (80 lines of code) (raw):
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""IO read functions using global context."""
from __future__ import annotations
from typing import TYPE_CHECKING
from datafusion.context import SessionContext
from datafusion.dataframe import DataFrame
if TYPE_CHECKING:
import pathlib
import pyarrow as pa
from datafusion.expr import Expr
def read_parquet(
path: str | pathlib.Path,
table_partition_cols: list[tuple[str, str]] | None = None,
parquet_pruning: bool = True,
file_extension: str = ".parquet",
skip_metadata: bool = True,
schema: pa.Schema | None = None,
file_sort_order: list[list[Expr]] | None = None,
) -> DataFrame:
"""Read a Parquet source into a :py:class:`~datafusion.dataframe.Dataframe`.
This function will use the global context. Any functions or tables registered
with another context may not be accessible when used with a DataFrame created
using this function.
Args:
path: Path to the Parquet file.
table_partition_cols: Partition columns.
parquet_pruning: Whether the parquet reader should use the predicate
to prune row groups.
file_extension: File extension; only files with this extension are
selected for data input.
skip_metadata: Whether the parquet reader should skip any metadata
that may be in the file schema. This can help avoid schema
conflicts due to metadata.
schema: An optional schema representing the parquet files. If None,
the parquet reader will try to infer it based on data in the
file.
file_sort_order: Sort order for the file.
Returns:
DataFrame representation of the read Parquet files
"""
if table_partition_cols is None:
table_partition_cols = []
return SessionContext.global_ctx().read_parquet(
str(path),
table_partition_cols,
parquet_pruning,
file_extension,
skip_metadata,
schema,
file_sort_order,
)
def read_json(
path: str | pathlib.Path,
schema: pa.Schema | None = None,
schema_infer_max_records: int = 1000,
file_extension: str = ".json",
table_partition_cols: list[tuple[str, str]] | None = None,
file_compression_type: str | None = None,
) -> DataFrame:
"""Read a line-delimited JSON data source.
This function will use the global context. Any functions or tables registered
with another context may not be accessible when used with a DataFrame created
using this function.
Args:
path: Path to the JSON file.
schema: The data source schema.
schema_infer_max_records: Maximum number of rows to read from JSON
files for schema inference if needed.
file_extension: File extension; only files with this extension are
selected for data input.
table_partition_cols: Partition columns.
file_compression_type: File compression type.
Returns:
DataFrame representation of the read JSON files.
"""
if table_partition_cols is None:
table_partition_cols = []
return SessionContext.global_ctx().read_json(
str(path),
schema,
schema_infer_max_records,
file_extension,
table_partition_cols,
file_compression_type,
)
def read_csv(
path: str | pathlib.Path | list[str] | list[pathlib.Path],
schema: pa.Schema | None = None,
has_header: bool = True,
delimiter: str = ",",
schema_infer_max_records: int = 1000,
file_extension: str = ".csv",
table_partition_cols: list[tuple[str, str]] | None = None,
file_compression_type: str | None = None,
) -> DataFrame:
"""Read a CSV data source.
This function will use the global context. Any functions or tables registered
with another context may not be accessible when used with a DataFrame created
using this function.
Args:
path: Path to the CSV file
schema: An optional schema representing the CSV files. If None, the
CSV reader will try to infer it based on data in file.
has_header: Whether the CSV file have a header. If schema inference
is run on a file with no headers, default column names are
created.
delimiter: An optional column delimiter.
schema_infer_max_records: Maximum number of rows to read from CSV
files for schema inference if needed.
file_extension: File extension; only files with this extension are
selected for data input.
table_partition_cols: Partition columns.
file_compression_type: File compression type.
Returns:
DataFrame representation of the read CSV files
"""
if table_partition_cols is None:
table_partition_cols = []
path = [str(p) for p in path] if isinstance(path, list) else str(path)
return SessionContext.global_ctx().read_csv(
path,
schema,
has_header,
delimiter,
schema_infer_max_records,
file_extension,
table_partition_cols,
file_compression_type,
)
def read_avro(
path: str | pathlib.Path,
schema: pa.Schema | None = None,
file_partition_cols: list[tuple[str, str]] | None = None,
file_extension: str = ".avro",
) -> DataFrame:
"""Create a :py:class:`DataFrame` for reading Avro data source.
This function will use the global context. Any functions or tables registered
with another context may not be accessible when used with a DataFrame created
using this function.
Args:
path: Path to the Avro file.
schema: The data source schema.
file_partition_cols: Partition columns.
file_extension: File extension to select.
Returns:
DataFrame representation of the read Avro file
"""
if file_partition_cols is None:
file_partition_cols = []
return SessionContext.global_ctx().read_avro(
str(path), schema, file_partition_cols, file_extension
)