core/maxframe/lib/filesystem/arrow.py (178 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import subprocess import weakref from typing import BinaryIO, Dict, Iterator, List, TextIO, Tuple, Union from urllib.parse import urlparse import pyarrow as pa from pyarrow.fs import FileInfo, FileSelector from pyarrow.fs import FileSystem as ArrowFileSystem from pyarrow.fs import FileType from pyarrow.fs import HadoopFileSystem as ArrowHadoopFileSystem from pyarrow.fs import LocalFileSystem as ArrowLocalFileSystem from ...utils import implements, stringify_path from .core import FileSystem, path_type __all__ = ("ArrowBasedLocalFileSystem", "HadoopFileSystem") # When pyarrow.fs.FileSystem gc collected, # the underlying connection will be closed, # so we hold the reference to make sure # FileSystem will not be gc collected before file object _file_to_filesystems = weakref.WeakKeyDictionary() class ArrowBasedFileSystem(FileSystem): """ FileSystem implemented with arrow fs API (>=2.0.0). """ def __init__(self, arrow_fs: ArrowFileSystem, sequential_read=False): self._arrow_fs = arrow_fs # for open('rb'), open a sequential reading only or not self._sequential_read = sequential_read @staticmethod def _process_path(path): return stringify_path(path) @implements(FileSystem.cat) def cat(self, path: path_type) -> bytes: path = self._process_path(path) file: pa.NativeFile = self._arrow_fs.open_input_stream(path) return file.read() @implements(FileSystem.ls) def ls(self, path: path_type) -> List[path_type]: path = self._process_path(path) file_selector: FileSelector = FileSelector(path) paths = [] for file_info in self._arrow_fs.get_file_info(file_selector): paths.append(file_info.path) return paths def _get_file_info(self, path: path_type) -> FileInfo: path = self._process_path(path) file_info: FileInfo = self._arrow_fs.get_file_info([path])[0] return file_info @implements(FileSystem.delete) def delete(self, path: path_type, recursive: bool = False): path = self._process_path(path) info = self._get_file_info(path) if info.is_file: self._arrow_fs.delete_file(path) elif info.type == FileType.Directory: if not recursive and len(self.ls(path)) > 0: raise OSError(f"[Errno 66] Directory not empty: '{path}'") self._arrow_fs.delete_dir(path) else: # pragma: no cover raise TypeError(f"path({path}) to delete must be a file or directory") @implements(FileSystem.rename) def rename(self, path: path_type, new_path: path_type): path = self._process_path(path) new_path = self._process_path(new_path) self._arrow_fs.move(path, new_path) @implements(FileSystem.stat) def stat(self, path: path_type) -> Dict: path = self._process_path(path) info = self._get_file_info(path) stat = dict(name=path, size=info.size, modified_time=info.mtime_ns / 1e9) if info.type == FileType.File: stat["type"] = "file" elif info.type == FileType.Directory: stat["type"] = "directory" else: # pragma: no cover stat["type"] = "other" return stat @implements(FileSystem.mkdir) def mkdir(self, path: path_type, create_parents: bool = True): path = self._process_path(path) self._arrow_fs.create_dir(path, recursive=create_parents) @implements(FileSystem.isdir) def isdir(self, path: path_type) -> bool: path = self._process_path(path) info = self._get_file_info(path) return info.type == FileType.Directory @implements(FileSystem.isfile) def isfile(self, path: path_type) -> bool: path = self._process_path(path) info = self._get_file_info(path) return info.is_file @implements(FileSystem._isfilestore) def _isfilestore(self) -> bool: return True @implements(FileSystem.exists) def exists(self, path: path_type): path = self._process_path(path) info = self._get_file_info(path) return info.type != FileType.NotFound @implements(FileSystem.open) def open(self, path: path_type, mode: str = "rb") -> Union[BinaryIO, TextIO]: path = self._process_path(path) is_binary = mode.endswith("b") if not is_binary: # pragma: no cover raise ValueError( f"mode can only be binary for arrow based filesystem, got {mode}" ) mode = mode.rstrip("b") if mode == "w": file = self._arrow_fs.open_output_stream(path) elif mode == "r": if self._sequential_read: # pragma: no cover file = self._arrow_fs.open_input_stream(path) else: file = self._arrow_fs.open_input_file(path) elif mode == "a": file = self._arrow_fs.open_append_stream(path) else: # pragma: no cover raise ValueError( f'mode can only be "wb", "rb" and "ab" for ' f"arrow based filesystem, got {mode}" ) _file_to_filesystems[file] = self._arrow_fs return file @implements(FileSystem.walk) def walk(self, path: path_type) -> Iterator[Tuple[str, List[str], List[str]]]: path = self._process_path(path) q = [path] while q: curr = q.pop(0) file_selector: FileSelector = FileSelector(curr) dirs, files = [], [] for info in self._arrow_fs.get_file_info(file_selector): if info.type == FileType.File: files.append(info.base_name) elif info.type == FileType.Directory: dirs.append(info.base_name) q.append(info.path) else: # pragma: no cover continue yield curr, dirs, files @implements(FileSystem.glob) def glob(self, path: path_type, recursive: bool = False) -> List[path_type]: from ._glob import FileSystemGlob path = self._process_path(path) return FileSystemGlob(self).glob(path, recursive=recursive) class ArrowBasedLocalFileSystem(ArrowBasedFileSystem): def __init__(self): super().__init__(ArrowLocalFileSystem()) _instance = None @classmethod def get_instance(cls): if cls._instance is None: cls._instance = ArrowBasedLocalFileSystem() return cls._instance class HadoopFileSystem(ArrowBasedFileSystem): def __init__( self, host="default", port=0, user=None, kerb_ticket=None, driver="libhdfs", extra_conf=None, ): assert driver == "libhdfs" if "HADOOP_HOME" in os.environ and "CLASSPATH" not in os.environ: classpath_proc = subprocess.run( [os.environ["HADOOP_HOME"] + "/bin/hdfs", "classpath", "--glob"], stdout=subprocess.PIPE, ) os.environ["CLASSPATH"] = classpath_proc.stdout.decode().strip() arrow_fs = ArrowHadoopFileSystem( host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf, ) super().__init__(arrow_fs) @staticmethod def _process_path(path): path = ArrowBasedFileSystem._process_path(path) # use urlparse to extract path from like: # hdfs://localhost:8020/tmp/test/simple_test.csv, # due to the reason that pa.fs.HadoopFileSystem cannot accept # path with hdfs:// prefix if path.startswith("hdfs://"): return urlparse(path).path else: return path