core/maxframe/lib/filesystem/fsmap.py (108 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections.abc import MutableMapping
from urllib.parse import urlparse
from .local import LocalFileSystem
class FSMap(MutableMapping):
"""
Wrap a FileSystem instance as a mutable wrapping.
The keys of the mapping become files under the given root, and the
values (which must be bytes) the contents of those files.
Parameters
----------
root: string
prefix for all the files
fs: FileSystem instance
check: bool (=True)
performs a touch at the location, to check for write access.
"""
def __init__(self, root, fs, check=False, create=False):
self.fs = fs
self.root = self._get_path(fs, root)
if create:
if not self.fs.exists(root):
self.fs.mkdir(root)
if check:
if not self.fs.exists(root):
raise ValueError(
f"Path {root} does not exist. Create with the ``create=True`` keyword"
)
with self.fs.open(fs.pathsep.join([root, "a"]), "w"):
pass
self.fs.rm(fs.pathsep.join([root, "a"]))
@staticmethod
def _get_path(fs, path):
return path if isinstance(fs, LocalFileSystem) else urlparse(path).path
@staticmethod
def _normalize_path(fs, path, lstrip=False, rstrip=False):
if fs.pathsep != "/": # pragma: no cover
path = path.replace("/", fs.pathsep)
if lstrip:
path = path.lstrip(fs.pathsep)
if rstrip:
path = path.rstrip(fs.pathsep)
return path
@staticmethod
def _join_path(fs, paths):
if fs.pathsep == "/":
return "/".join(paths)
new_paths = []
for i, path in enumerate(paths):
path = FSMap._normalize_path(
fs, path, lstrip=i > 0, rstrip=i < len(paths) - 1
)
new_paths.append(path)
return fs.pathsep.join(new_paths)
def clear(self):
"""Remove all keys below root - empties out mapping"""
try:
self.fs.rm(self.root, True)
self.fs.mkdir(self.root)
except: # noqa: E722 # pragma: no cover
pass
def _key_to_str(self, key):
"""Generate full path for the key"""
if isinstance(key, (tuple, list)):
key = str(tuple(key))
else:
key = str(key)
return self._join_path(self.fs, [self.root, key]) if self.root else key
def _str_to_key(self, s):
"""Strip path of to leave key name"""
key = self._normalize_path(self.fs, s[len(self.root) :], lstrip=True)
if self.fs.pathsep != "/": # pragma: no cover
key = key.replace(self.fs.pathsep, "/")
return key
def __getitem__(self, key, default=None):
"""Retrieve data"""
key = self._key_to_str(key)
try:
result = self.fs.cat(key)
except: # noqa: E722
if default is not None:
return default
raise KeyError(key)
return result
def pop(self, key, default=None):
result = self.__getitem__(key, default)
try:
del self[key]
except KeyError:
pass
return result
@staticmethod
def _parent(fs, path):
path = FSMap._get_path(fs, path.rstrip(fs.pathsep))
if fs.pathsep in path:
return path.rsplit(fs.pathsep, 1)[0]
else: # pragma: no cover
return ""
def __setitem__(self, key, value):
"""Store value in key"""
key = self._key_to_str(key)
try:
self.fs.mkdir(self._parent(self.fs, key))
except FileExistsError:
pass
with self.fs.open(key, "wb") as f:
f.write(value)
@staticmethod
def _find(fs, path):
out = set()
for path, dirs, files in fs.walk(path):
out.update(fs.pathsep.join([path, f]) for f in files)
if fs.isfile(path) and path not in out:
# walk works on directories, but find should also return [path]
# when path happens to be a file
out.add(path)
return sorted(out)
def __iter__(self):
return (self._str_to_key(x) for x in self._find(self.fs, self.root))
def __len__(self):
return len(self._find(self.fs, self.root))
def __delitem__(self, key):
"""Remove key"""
try:
self.fs.rm(self._key_to_str(key))
except: # noqa: E722
raise KeyError
def __contains__(self, key):
"""Does key exist in mapping?"""
return self.fs.exists(self._key_to_str(key))