odps/models/volume_ext.py (157 lines of code) (raw):
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 1999-2024 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import contextlib
import requests
from .. import serializers
from ..compat import Enum, urlparse
from ..errors import OSSSignUrlError
from . import cache_parent
from .volume_fs import FSVolume, FSVolumeObject, FSVolumeObjects
class SignUrlMethod(Enum):
GET = "get"
PUT = "put"
class ExternalVolumeObject(FSVolumeObject):
@classmethod
def _get_base_cls(cls):
return ExternalVolumeObject
@classmethod
def _get_file_cls(cls):
return ExternalVolumeFile
@classmethod
def _get_dir_cls(cls):
return ExternalVolumeDir
def get_sign_url(self, method, seconds=None):
if isinstance(method, SignUrlMethod):
method = method.value
params = {"sign_url": method.lower()}
if seconds:
params["expire_seconds"] = seconds
headers = {"x-odps-volume-fs-path": self.path}
schema_name = self.volume._get_schema_name()
if schema_name is not None:
params["curr_schema"] = schema_name
resp = self._client.get(
self.parent.resource(), action="meta", params=params, headers=headers
)
self.parse(self._client, resp, obj=self)
return self.sign_url
def _request_sign_url(self, path, method, *args, **kw):
if path:
path = self.path.rstrip("/") + "/" + path.lstrip("/")
else:
path = self.path
vol_rel_path = path[len(self.volume.name) + 1 :]
sign_url = self.volume.get_sign_url(vol_rel_path, method)
replace_internal_host = kw.pop("replace_internal_host", False)
if replace_internal_host:
parsed_url = urlparse(sign_url)
if "-internal." in parsed_url.netloc:
new_netloc = parsed_url.netloc.replace("-internal.", ".")
sign_url = sign_url.replace(parsed_url.netloc, new_netloc)
if method == SignUrlMethod.PUT:
resp = requests.put(sign_url, *args, **kw)
else:
resp = requests.get(sign_url, *args, **kw)
resp.volume_path = path
self._check_response(resp)
return resp
@staticmethod
def _check_response(resp):
# when response code is a string, just skip
if hasattr(resp, "status_code") and resp.status_code >= 400:
try:
import oss2.exceptions
oss_exc = oss2.exceptions.make_exception(resp.raw)
raise OSSSignUrlError(oss_exc)
except ImportError:
raise OSSSignUrlError(resp.content)
def _delete(self, recursive=False):
"""
Delete current directory.
:param recursive: indicate whether a recursive deletion should be performed.
"""
params = {"recursive": str(recursive).lower()}
headers = {"x-odps-volume-fs-path": self.path.rstrip("/")}
self._del_cache(self.path)
self._client.delete(
self.parent.resource(),
params=params,
headers=headers,
curr_schema=self.parent._get_schema_name(),
)
@contextlib.contextmanager
def _open_reader(self, path, replace_internal_host=False):
"""
Open a volume file and read contents in it.
:param str path: file name to be opened
:return: file reader
:Example:
>>> with fs_dir.open_reader('file') as reader:
>>> [print(line) for line in reader]
"""
req = self._request_sign_url(
path,
SignUrlMethod.GET,
stream=True,
replace_internal_host=replace_internal_host,
)
yield req.raw
@contextlib.contextmanager
def _open_writer(self, path, **kwargs):
"""
Open a volume file and write contents into it.
:param str path: file name to be opened
:return: file reader
:Example:
>>> with fs_dir.open_writer('file') as reader:
>>> writer.write('some content')
"""
from ..tunnel.io import RequestsIO
if kwargs.pop("replication", None) is not None: # pragma: no cover
raise TypeError("External volume does not support replication argument")
replace_internal_host = kwargs.pop("replace_internal_host", False)
def put_func(data):
self._request_sign_url(
path,
SignUrlMethod.PUT,
data=data,
replace_internal_host=replace_internal_host,
)
rio = RequestsIO(put_func)
try:
rio.start()
yield rio
finally:
rio.finish()
@cache_parent
class ExternalVolumeFile(ExternalVolumeObject):
def __init__(self, **kw):
super(ExternalVolumeFile, self).__init__(**kw)
self._isdir = False
def delete(self):
"""
Delete current file.
"""
return self._delete(False)
def open_reader(self, replace_internal_host=False):
"""
Open current file and read contents in it.
:return: file reader
:Example:
>>> with fs_file.open_reader('file') as reader:
>>> [print(line) for line in reader]
"""
return self._open_reader(None, replace_internal_host=replace_internal_host)
def open_writer(self, **kw):
return self._open_writer(None, **kw)
@cache_parent
class ExternalVolumeDir(ExternalVolumeObject):
def __init__(self, **kw):
super(ExternalVolumeDir, self).__init__(**kw)
self._isdir = True
@property
def objects(self):
return ExternalVolumeObjects(parent=self, client=self._client)
def create_dir(self, path, replace_internal_host=False):
"""
Creates and returns a sub-directory under the current directory.
:param str path: directory name to be created
:return: directory object
:rtype: :class:`odps.models.FSVolumeDir`
"""
path = path.strip("/") + "/"
resp = self._request_sign_url(
path,
SignUrlMethod.PUT,
b"",
replace_internal_host=replace_internal_host,
)
dir_object = type(self)(
path=resp.volume_path, parent=self.parent, client=self._client
)
dir_object.reload()
return dir_object
def __contains__(self, item):
return item in self.objects
def __iter__(self):
return self.objects.iterate()
def __getitem__(self, item):
return self.objects[item]
def delete(self, recursive=False):
"""
Delete current directory.
:param recursive: indicate whether a recursive deletion should be performed.
"""
self._delete(recursive=recursive)
def open_reader(self, path, replace_internal_host=False):
"""
Open a volume file and read contents in it.
:param str path: file name to be opened
:return: file reader
:Example:
>>> with fs_dir.open_reader('file') as reader:
>>> [print(line) for line in reader]
"""
return self._open_reader(path, replace_internal_host=replace_internal_host)
def open_writer(self, path, **kwargs):
"""
Open a volume file and write contents into it.
:param str path: file name to be opened
:return: file reader
:Example:
>>> with fs_dir.open_writer('file') as reader:
>>> writer.write('some content')
"""
return self._open_writer(path, **kwargs)
class ExternalVolumeObjects(FSVolumeObjects):
objects = serializers.XMLNodesReferencesField(ExternalVolumeObject, "Item")
@classmethod
def _get_single_object_cls(cls):
return ExternalVolumeObject
class ExternalVolume(FSVolume):
_dir_cls = ExternalVolumeDir
def get_sign_url(self, path, method, seconds=None):
path = "/" + self.name + "/" + path.lstrip("/")
vol_file = ExternalVolumeFile(path=path, parent=self, client=self._client)
return vol_file.get_sign_url(method, seconds=seconds)