odps/models/volume_ext.py (157 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2024 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import contextlib import requests from .. import serializers from ..compat import Enum, urlparse from ..errors import OSSSignUrlError from . import cache_parent from .volume_fs import FSVolume, FSVolumeObject, FSVolumeObjects class SignUrlMethod(Enum): GET = "get" PUT = "put" class ExternalVolumeObject(FSVolumeObject): @classmethod def _get_base_cls(cls): return ExternalVolumeObject @classmethod def _get_file_cls(cls): return ExternalVolumeFile @classmethod def _get_dir_cls(cls): return ExternalVolumeDir def get_sign_url(self, method, seconds=None): if isinstance(method, SignUrlMethod): method = method.value params = {"sign_url": method.lower()} if seconds: params["expire_seconds"] = seconds headers = {"x-odps-volume-fs-path": self.path} schema_name = self.volume._get_schema_name() if schema_name is not None: params["curr_schema"] = schema_name resp = self._client.get( self.parent.resource(), action="meta", params=params, headers=headers ) self.parse(self._client, resp, obj=self) return self.sign_url def _request_sign_url(self, path, method, *args, **kw): if path: path = self.path.rstrip("/") + "/" + path.lstrip("/") else: path = self.path vol_rel_path = path[len(self.volume.name) + 1 :] sign_url = self.volume.get_sign_url(vol_rel_path, method) replace_internal_host = kw.pop("replace_internal_host", False) if replace_internal_host: parsed_url = urlparse(sign_url) if "-internal." in parsed_url.netloc: new_netloc = parsed_url.netloc.replace("-internal.", ".") sign_url = sign_url.replace(parsed_url.netloc, new_netloc) if method == SignUrlMethod.PUT: resp = requests.put(sign_url, *args, **kw) else: resp = requests.get(sign_url, *args, **kw) resp.volume_path = path self._check_response(resp) return resp @staticmethod def _check_response(resp): # when response code is a string, just skip if hasattr(resp, "status_code") and resp.status_code >= 400: try: import oss2.exceptions oss_exc = oss2.exceptions.make_exception(resp.raw) raise OSSSignUrlError(oss_exc) except ImportError: raise OSSSignUrlError(resp.content) def _delete(self, recursive=False): """ Delete current directory. :param recursive: indicate whether a recursive deletion should be performed. """ params = {"recursive": str(recursive).lower()} headers = {"x-odps-volume-fs-path": self.path.rstrip("/")} self._del_cache(self.path) self._client.delete( self.parent.resource(), params=params, headers=headers, curr_schema=self.parent._get_schema_name(), ) @contextlib.contextmanager def _open_reader(self, path, replace_internal_host=False): """ Open a volume file and read contents in it. :param str path: file name to be opened :return: file reader :Example: >>> with fs_dir.open_reader('file') as reader: >>> [print(line) for line in reader] """ req = self._request_sign_url( path, SignUrlMethod.GET, stream=True, replace_internal_host=replace_internal_host, ) yield req.raw @contextlib.contextmanager def _open_writer(self, path, **kwargs): """ Open a volume file and write contents into it. :param str path: file name to be opened :return: file reader :Example: >>> with fs_dir.open_writer('file') as reader: >>> writer.write('some content') """ from ..tunnel.io import RequestsIO if kwargs.pop("replication", None) is not None: # pragma: no cover raise TypeError("External volume does not support replication argument") replace_internal_host = kwargs.pop("replace_internal_host", False) def put_func(data): self._request_sign_url( path, SignUrlMethod.PUT, data=data, replace_internal_host=replace_internal_host, ) rio = RequestsIO(put_func) try: rio.start() yield rio finally: rio.finish() @cache_parent class ExternalVolumeFile(ExternalVolumeObject): def __init__(self, **kw): super(ExternalVolumeFile, self).__init__(**kw) self._isdir = False def delete(self): """ Delete current file. """ return self._delete(False) def open_reader(self, replace_internal_host=False): """ Open current file and read contents in it. :return: file reader :Example: >>> with fs_file.open_reader('file') as reader: >>> [print(line) for line in reader] """ return self._open_reader(None, replace_internal_host=replace_internal_host) def open_writer(self, **kw): return self._open_writer(None, **kw) @cache_parent class ExternalVolumeDir(ExternalVolumeObject): def __init__(self, **kw): super(ExternalVolumeDir, self).__init__(**kw) self._isdir = True @property def objects(self): return ExternalVolumeObjects(parent=self, client=self._client) def create_dir(self, path, replace_internal_host=False): """ Creates and returns a sub-directory under the current directory. :param str path: directory name to be created :return: directory object :rtype: :class:`odps.models.FSVolumeDir` """ path = path.strip("/") + "/" resp = self._request_sign_url( path, SignUrlMethod.PUT, b"", replace_internal_host=replace_internal_host, ) dir_object = type(self)( path=resp.volume_path, parent=self.parent, client=self._client ) dir_object.reload() return dir_object def __contains__(self, item): return item in self.objects def __iter__(self): return self.objects.iterate() def __getitem__(self, item): return self.objects[item] def delete(self, recursive=False): """ Delete current directory. :param recursive: indicate whether a recursive deletion should be performed. """ self._delete(recursive=recursive) def open_reader(self, path, replace_internal_host=False): """ Open a volume file and read contents in it. :param str path: file name to be opened :return: file reader :Example: >>> with fs_dir.open_reader('file') as reader: >>> [print(line) for line in reader] """ return self._open_reader(path, replace_internal_host=replace_internal_host) def open_writer(self, path, **kwargs): """ Open a volume file and write contents into it. :param str path: file name to be opened :return: file reader :Example: >>> with fs_dir.open_writer('file') as reader: >>> writer.write('some content') """ return self._open_writer(path, **kwargs) class ExternalVolumeObjects(FSVolumeObjects): objects = serializers.XMLNodesReferencesField(ExternalVolumeObject, "Item") @classmethod def _get_single_object_cls(cls): return ExternalVolumeObject class ExternalVolume(FSVolume): _dir_cls = ExternalVolumeDir def get_sign_url(self, path, method, seconds=None): path = "/" + self.name + "/" + path.lstrip("/") vol_file = ExternalVolumeFile(path=path, parent=self, client=self._client) return vol_file.get_sign_url(method, seconds=seconds)