odps/models/volume_fs.py (383 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2024 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from .. import errors, serializers, utils from ..compat import long_type, six from .cache import cache_parent from .core import Iterable, LazyLoad, cache from .volumes import Volume class FSVolumeObject(LazyLoad): __slots__ = ("_volume_fs_tunnel",) _type_indicator = "_isdir" _cache_name_arg = "path" class CreateRequestXML(serializers.XMLSerializableModel): _root = "Item" type = serializers.XMLNodeField("Type") path = serializers.XMLNodeField("Path") class UpdateRequestXML(serializers.XMLSerializableModel): _root = "Item" path = serializers.XMLNodeField("Path") replication = serializers.XMLNodeField("Replication") _project = serializers.XMLNodeField("Project") _volume = serializers.XMLNodeField("Volume") path = serializers.XMLNodeField("Path") _isdir = serializers.XMLNodeField("Isdir", type="bool") permission = serializers.XMLNodeField("permission") _replication = serializers.XMLNodeField("BlockReplications", parse_callback=int) length = serializers.XMLNodeField("Length", parse_callback=long_type) quota = serializers.XMLNodeField("Quota", parse_callback=long_type) block_size = serializers.XMLNodeField("BlockSize", parse_callback=long_type) owner = serializers.XMLNodeField("Owner") group = serializers.XMLNodeField("Group") creation_time = serializers.XMLNodeField("CreationTime", type="rfc822") access_time = serializers.XMLNodeField("AccessTime", type="rfc822") last_modified_time = serializers.XMLNodeField("ModificationTime", type="rfc822") symlink = serializers.XMLNodeField("Symlink") sign_url = serializers.XMLNodeField("URL") @classmethod def _get_base_cls(cls): return FSVolumeObject @classmethod def _get_dir_cls(cls): return FSVolumeDir @classmethod def _get_file_cls(cls): return FSVolumeFile @classmethod def _get_objects_cls(cls): return FSVolumeObjects @staticmethod def _filter_cache(_, **kwargs): isdir = kwargs.get("_isdir") return isdir is not None and isdir != "UNKNOWN" @cache def __new__(cls, *args, **kwargs): isdir = kwargs.get("_isdir") base_cls = cls._get_base_cls() if cls is not base_cls and issubclass(cls, base_cls): return object.__new__(cls) if isdir is not None: if isdir == "UNKNOWN": return object.__new__(base_cls) return object.__new__(cls._get_dir_cls() if isdir else cls._get_file_cls()) obj = base_cls(_isdir="UNKNOWN", **kwargs) obj.reload() return base_cls(**obj.extract()) @utils.experimental( "Volume2 is still experimental. Usage in production environment is strongly opposed.", cond=lambda self, *_, **kw: ( type(self) in (FSVolumeObject, FSVolumeDir, FSVolumeFile) and type(kw.get("parent")) is FSVolume ), ) def __init__(self, *args, **kwargs): super(FSVolumeObject, self).__init__(*args, **kwargs) def _name(self): return self.path def _set_state(self, name, parent, client): self.__init__(path=name, _parent=parent, _client=client) def split(self): return self.path.rsplit("/", 1) @property def basename(self): _, fn = self.split() return fn @property def dirname(self): dn, _ = self.split() return dn @property def volume(self): return self.parent @property def is_root(self): return self.path == "/" + self.parent.name def reload(self): # check if the volume path is the root if self.is_root: return schema_name = self.parent._get_schema_name() params = {} if schema_name is not None: params["curr_schema"] = schema_name resp = self._client.get( self.parent.resource(), action="meta", params=params, headers={"x-odps-volume-fs-path": self.path}, ) self.parse(self._client, resp, obj=self) self._loaded = True @staticmethod def _normpath(path): path = path.rstrip("/") i = 0 parts = [] start = 0 while i < len(path): if path[i] == "/" or i == len(path) - 1: chunk = path[start : i + 1] start = i + 1 if chunk in ["", "/", ".", "./"]: # do nothing pass elif chunk in ["..", "../"]: if len(parts): parts = parts[: len(parts) - 1] else: parts.append(chunk) else: parts.append(chunk) i += 1 if path.startswith("/"): return "/" + "".join(parts) return "".join(parts) def _del_cache(self, path): root_objs = self._get_objects_cls()( parent=self.volume.root, client=self._client ) if not path.startswith("/"): path = self.path + "/" + path.lstrip("/") del root_objs[path] def move(self, new_path, replication=None): """ Move current path to a new location. :param new_path: target location of current file / directory :param replication: number of replication """ if not new_path.startswith("/"): new_path = self._normpath(self.dirname + "/" + new_path) else: new_path = self._normpath(new_path) if new_path == self.path: raise ValueError("New path should be different from the original one.") update_def = self.UpdateRequestXML(path=new_path) if replication: update_def.replication = replication headers = { "Content-Type": "application/xml", "x-odps-volume-fs-path": self.path, } schema_name = self.parent._get_schema_name() params = {} if schema_name is not None: params["curr_schema"] = schema_name self._client.put( self.parent.resource(), action="meta", params=params, headers=headers, data=update_def.serialize(), ) self._del_cache(self.path) self.path = new_path self.reload() def _create_volume_fs_tunnel(self, endpoint=None, quota_name=None): if self._volume_fs_tunnel is not None: return self._volume_fs_tunnel from ..tunnel import VolumeFSTunnel self._volume_fs_tunnel = VolumeFSTunnel( client=self._client, project=self.project, endpoint=endpoint or self.project._tunnel_endpoint, quota_name=quota_name, ) return self._volume_fs_tunnel @cache_parent class FSVolumeDir(FSVolumeObject): """ VolumeFSDir represents a directory under a file system volume in ODPS. You can use ``create_dir`` to create a sub-directory, ``open_reader`` to open a file to read, ``open_writer`` to write a file and ``delete`` to remove. Following operations are also supported. >>> # iterate over all files and sub-directories >>> for o in fs_dir: >>> print(o.path) >>> # check if a file exists in current volume >>> print(file_name in fs_dir) >>> # get a file/directory object >>> file_obj = fs_dir[file_name] """ def __init__(self, **kw): super(FSVolumeDir, self).__init__(**kw) self._isdir = True @property def objects(self): return self._get_objects_cls()(parent=self, client=self._client) def create_dir(self, path, **kw): """ Creates and returns a sub-directory under the current directory. :param str path: directory name to be created :return: directory object :rtype: :class:`odps.models.FSVolumeDir` """ path = self.path + "/" + path.lstrip("/") dir_def = self.CreateRequestXML(type="directory", path=path) headers = {"Content-Type": "application/xml"} self._client.post( self.parent.resource(), headers=headers, data=dir_def.serialize(), curr_schema=self.parent._get_schema_name(), ) dir_object = type(self)(path=path, parent=self.parent, client=self._client) dir_object.reload() return dir_object def __contains__(self, item): return item in self.objects def __iter__(self): return self.objects.iterate() def __getitem__(self, item): return self.objects[item] def delete(self, recursive=False): """ Delete current directory. :param recursive: indicate whether a recursive deletion should be performed. """ params = {"recursive": recursive} headers = {"x-odps-volume-fs-path": self.path} self._del_cache(self.path) self._client.delete( self.parent.resource(), params=params, headers=headers, curr_schema=self.parent._get_schema_name(), ) def open_reader(self, path, **kw): """ Open a volume file and read contents in it. :param str path: file name to be opened :param start: start position :param length: length limit :param quota_name: name of tunnel quota :param compress_option: the compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :return: file reader :Example: >>> with fs_dir.open_reader('file') as reader: >>> [print(line) for line in reader] """ endpoint = kw.pop("endpoint", None) quota_name = kw.pop("quota_name", None) tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name) path = ( self.path.lstrip("/")[len(self.parent.name) :].lstrip("/") + "/" + path.lstrip("/") ) return tunnel.open_reader(self.parent, path, **kw) def open_writer(self, path, replication=None, **kw): """ Open a volume file and write contents into it. :param str path: file name to be opened :param quota_name: name of tunnel quota :param compress_option: the compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :return: file reader :Example: >>> with fs_dir.open_writer('file') as reader: >>> writer.write('some content') """ endpoint = kw.pop("endpoint", None) quota_name = kw.pop("quota_name", None) tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name) vol_path = ( self.path.lstrip("/")[len(self.parent.name) :].lstrip("/") + "/" + path.lstrip("/") ) return tunnel.open_writer(self.parent, vol_path, replication=replication, **kw) @cache_parent class FSVolumeFile(FSVolumeObject): def __init__(self, **kw): super(FSVolumeFile, self).__init__(**kw) self._isdir = False @property def replication(self): """ Get / set replication number of the file. """ return self._replication @replication.setter def replication(self, value): update_def = self.UpdateRequestXML(replication=value) headers = { "Content-Type": "application/xml", "x-odps-volume-fs-path": self.path, } schema_name = self.parent._get_schema_name() params = {} if schema_name is not None: params["curr_schema"] = schema_name self._client.put( self.parent.resource(), action="meta", params=params, headers=headers, data=update_def.serialize(), ) self.reload() def delete(self, **_): """ Delete current file. """ params = {"recursive": False} headers = {"x-odps-volume-fs-path": self.path} schema_name = self.parent._get_schema_name() if schema_name is not None: params["curr_schema"] = schema_name self._del_cache(self.path) self._client.delete(self.parent.resource(), params=params, headers=headers) def open_reader(self, **kw): """ Open current file and read contents in it. :param start: start position :param length: length limit :param quota_name: name of tunnel quota :param compress_option: the compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :return: file reader :Example: >>> with fs_file.open_reader('file') as reader: >>> [print(line) for line in reader] """ endpoint = kw.pop("endpoint", None) quota_name = kw.pop("quota_name", None) tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name) path = self.path.lstrip("/")[len(self.parent.name) :].lstrip("/") return tunnel.open_reader(self.parent, path, **kw) def open_writer(self, replication=None, **kw): endpoint = kw.pop("endpoint", None) quota_name = kw.pop("quota_name", None) tunnel = self._create_volume_fs_tunnel(endpoint=endpoint, quota_name=quota_name) path = self.path.lstrip("/")[len(self.parent.name) :].lstrip("/") return tunnel.open_writer(self.parent, path, replication=replication, **kw) class FSVolumeObjects(Iterable): marker = serializers.XMLNodeField("Marker") max_items = serializers.XMLNodeField("MaxItems", parse_callback=int) objects = serializers.XMLNodesReferencesField(FSVolumeObject, "Item") @property def project(self): return self.parent.parent.parent @property def volume(self): return self.parent.parent @classmethod def _get_single_object_cls(cls): return FSVolumeObject def _get(self, name): path = name if not path.startswith(self.parent.path): path = self.parent.path + "/" + name.lstrip("/") return self._get_single_object_cls()( client=self._client, parent=self.volume, path=path ) def __contains__(self, item): if isinstance(item, six.string_types): try: # as reload() will be done in constructor of VolumeFSObject, we return directly. return self._get(item) except errors.NoSuchObject: return False elif isinstance(item, self._get_single_object_cls()): obj = item try: obj.reload() return True except errors.NoSuchObject: return False else: return False def __iter__(self): return self.iterate() def iterate(self): params = {"expectmarker": "true"} headers = {"x-odps-volume-fs-path": self.parent.path} schema_name = self.volume._get_schema_name() if schema_name is not None: params["curr_schema"] = schema_name def _it(): last_marker = params.get("marker") if "marker" in params and (last_marker is None or len(last_marker) == 0): return url = self.volume.resource() resp = self._client.get(url, params=params, headers=headers) r = type(self).parse(self._client, resp, obj=self, parent=self.volume) params["marker"] = r.marker return r.objects while True: objects = _it() if objects is None: break for obj in objects: yield obj @cache_parent class FSVolume(Volume): """ FSVolume represents the new-fashioned file system volume in ODPS. You can use ``create_dir`` to create a directory, ``open_reader`` to open a file to read and ``open_writer`` to write a file. Following operations are also supported. >>> # iterate over all files and directories >>> for o in fs_volume: >>> print(o.path) >>> # check if a file exists in current volume >>> print(file_name in fs_volume) >>> # get a file/directory object >>> file_obj = fs_volume[file_name] """ __slots__ = ("_root_dir",) _dir_cls = FSVolumeDir def create_dir(self, path, **kw): """ Creates and returns a directory under the current volume. :param str path: directory name to be created :return: directory object :rtype: :class:`odps.models.FSVolumeDir` """ return self.root.create_dir(path, **kw) def __contains__(self, item): return item in self.root def __iter__(self): for it in self.root: yield it def __getitem__(self, item): if not item: return self.root return self.root[item] @property def path(self): return "/" + self.name @property def location(self): if self.type != Volume.Type.EXTERNAL: raise AttributeError return self.properties.get(Volume.EXTERNAL_VOLUME_LOCATION_KEY) @property def rolearn(self): if self.type != Volume.Type.EXTERNAL: raise AttributeError return self.properties.get(Volume.EXTERNAL_VOLUME_ROLEARN_KEY) def open_reader(self, path, **kw): """ Open a volume file and read contents in it. :param str path: file name to be opened :param start: start position :param length: length limit :param compress_option: the compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :return: file reader :Example: >>> with volume.open_reader('file') as reader: >>> [print(line) for line in reader] """ return self.root.open_reader(path, **kw) def open_writer(self, path, replication=None, **kw): """ Open a volume file and write contents into it. :param str path: file name to be opened :param compress_option: the compression algorithm, level and strategy :type compress_option: :class:`odps.tunnel.CompressOption` :return: file reader :Example: >>> with volume.open_writer('file') as reader: >>> writer.write('some content') """ return self.root.open_writer(path, replication=replication, **kw) def delete(self, path, recursive=False): try: return self[path].delete(recursive=recursive) except TypeError: return self[path].delete() @property def root(self): if not self._root_dir: self._root_dir = self._dir_cls( path="/" + self.name, parent=self, client=self._client ) return self._root_dir # keep renames compatible VolumeFSDir = FSVolumeDir VolumeFSFile = FSVolumeFile