core/maxframe/lib/filesystem/fsmap.py (108 lines of code) (raw):

# Copyright 1999-2025 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections.abc import MutableMapping from urllib.parse import urlparse from .local import LocalFileSystem class FSMap(MutableMapping): """ Wrap a FileSystem instance as a mutable wrapping. The keys of the mapping become files under the given root, and the values (which must be bytes) the contents of those files. Parameters ---------- root: string prefix for all the files fs: FileSystem instance check: bool (=True) performs a touch at the location, to check for write access. """ def __init__(self, root, fs, check=False, create=False): self.fs = fs self.root = self._get_path(fs, root) if create: if not self.fs.exists(root): self.fs.mkdir(root) if check: if not self.fs.exists(root): raise ValueError( f"Path {root} does not exist. Create with the ``create=True`` keyword" ) with self.fs.open(fs.pathsep.join([root, "a"]), "w"): pass self.fs.rm(fs.pathsep.join([root, "a"])) @staticmethod def _get_path(fs, path): return path if isinstance(fs, LocalFileSystem) else urlparse(path).path @staticmethod def _normalize_path(fs, path, lstrip=False, rstrip=False): if fs.pathsep != "/": # pragma: no cover path = path.replace("/", fs.pathsep) if lstrip: path = path.lstrip(fs.pathsep) if rstrip: path = path.rstrip(fs.pathsep) return path @staticmethod def _join_path(fs, paths): if fs.pathsep == "/": return "/".join(paths) new_paths = [] for i, path in enumerate(paths): path = FSMap._normalize_path( fs, path, lstrip=i > 0, rstrip=i < len(paths) - 1 ) new_paths.append(path) return fs.pathsep.join(new_paths) def clear(self): """Remove all keys below root - empties out mapping""" try: self.fs.rm(self.root, True) self.fs.mkdir(self.root) except: # noqa: E722 # pragma: no cover pass def _key_to_str(self, key): """Generate full path for the key""" if isinstance(key, (tuple, list)): key = str(tuple(key)) else: key = str(key) return self._join_path(self.fs, [self.root, key]) if self.root else key def _str_to_key(self, s): """Strip path of to leave key name""" key = self._normalize_path(self.fs, s[len(self.root) :], lstrip=True) if self.fs.pathsep != "/": # pragma: no cover key = key.replace(self.fs.pathsep, "/") return key def __getitem__(self, key, default=None): """Retrieve data""" key = self._key_to_str(key) try: result = self.fs.cat(key) except: # noqa: E722 if default is not None: return default raise KeyError(key) return result def pop(self, key, default=None): result = self.__getitem__(key, default) try: del self[key] except KeyError: pass return result @staticmethod def _parent(fs, path): path = FSMap._get_path(fs, path.rstrip(fs.pathsep)) if fs.pathsep in path: return path.rsplit(fs.pathsep, 1)[0] else: # pragma: no cover return "" def __setitem__(self, key, value): """Store value in key""" key = self._key_to_str(key) try: self.fs.mkdir(self._parent(self.fs, key)) except FileExistsError: pass with self.fs.open(key, "wb") as f: f.write(value) @staticmethod def _find(fs, path): out = set() for path, dirs, files in fs.walk(path): out.update(fs.pathsep.join([path, f]) for f in files) if fs.isfile(path) and path not in out: # walk works on directories, but find should also return [path] # when path happens to be a file out.add(path) return sorted(out) def __iter__(self): return (self._str_to_key(x) for x in self._find(self.fs, self.root)) def __len__(self): return len(self._find(self.fs, self.root)) def __delitem__(self, key): """Remove key""" try: self.fs.rm(self._key_to_str(key)) except: # noqa: E722 raise KeyError def __contains__(self, key): """Does key exist in mapping?""" return self.fs.exists(self._key_to_str(key))