core/maxframe/lib/filesystem/oss.py (96 lines of code) (raw):
# Copyright 1999-2025 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, Iterator, List, Tuple
from urllib import parse
from ...utils import implements, lazy_import
from ._oss_lib import common as oc
from ._oss_lib.glob import glob
from ._oss_lib.handle import OSSIOBase
from .base import FileSystem, path_type
oss2 = lazy_import("oss2", placeholder=True)
_oss_time_out = 10
class OSSFileSystem(FileSystem):
_instance = None
@classmethod
def get_instance(cls):
if cls._instance is None:
cls._instance = OSSFileSystem()
return cls._instance
@implements(FileSystem.cat)
def cat(self, path: path_type):
raise NotImplementedError
@implements(FileSystem.ls)
def ls(self, path: path_type) -> List[path_type]:
file_list = []
file_entry = oc.OSSFileEntry(path)
if not file_entry.is_dir():
raise OSError("ls for file is not supported")
else:
bucket, key, access_key_id, access_key_secret, end_point = oc.parse_osspath(
path
)
oss_bucket = oss2.Bucket(
auth=oss2.Auth(
access_key_id=access_key_id, access_key_secret=access_key_secret
),
endpoint=end_point,
bucket_name=bucket,
connect_timeout=_oss_time_out,
)
for obj in oss2.ObjectIteratorV2(oss_bucket, prefix=key):
if obj.key.endswith("/"):
continue
obj_path = rf"oss://{bucket}/{obj.key}"
file_list.append(
build_oss_path(
obj_path, access_key_id, access_key_secret, end_point
)
)
return file_list
@implements(FileSystem.delete)
def delete(self, path: path_type, recursive: bool = False):
raise NotImplementedError
@implements(FileSystem.rename)
def rename(self, path: path_type, new_path: path_type):
raise NotImplementedError
@implements(FileSystem.stat)
def stat(self, path: path_type) -> Dict:
ofe = oc.OSSFileEntry(path)
return ofe.stat()
@implements(FileSystem.mkdir)
def mkdir(self, path: path_type, create_parents: bool = True):
raise NotImplementedError
@implements(FileSystem.isdir)
def isdir(self, path: path_type) -> bool:
file_entry = oc.OSSFileEntry(path)
return file_entry.is_dir()
@implements(FileSystem.isfile)
def isfile(self, path: path_type) -> bool:
file_entry = oc.OSSFileEntry(path)
return file_entry.is_file()
@implements(FileSystem._isfilestore)
def _isfilestore(self) -> bool:
raise NotImplementedError
@implements(FileSystem.exists)
def exists(self, path: path_type):
return oc.oss_exists(path)
@implements(FileSystem.open)
def open(self, path: path_type, mode: str = "rb") -> OSSIOBase:
file_handle = OSSIOBase(path, mode)
return file_handle
@implements(FileSystem.walk)
def walk(self, path: path_type) -> Iterator[Tuple[str, List[str], List[str]]]:
raise NotImplementedError
@implements(FileSystem.glob)
def glob(self, path: path_type, recursive: bool = False) -> List[path_type]:
return glob(path, recursive=recursive)
def build_oss_path(path: path_type, access_key_id, access_key_secret, end_point):
"""
Returns a path with oss info.
Used to register the access_key_id, access_key_secret and
endpoint of OSS. The access_key_id and endpoint are put
into the url with url-safe-base64 encoding.
Parameters
----------
path : path_type
The original oss url.
access_key_id : str
The access key id of oss.
access_key_secret : str
The access key secret of oss.
end_point : str
The endpoint of oss.
Returns
-------
path_type
Path include the encoded access key id, end point and
access key secret of oss.
"""
if isinstance(path, (list, tuple)):
path = path[0]
param_dict = {"access_key_id": access_key_id, "end_point": end_point}
id_endpoint = oc.dict_to_url(param_dict)
password = access_key_secret
parse_result = parse.urlparse(path)
new_path = (
f"{parse_result.scheme}://{id_endpoint}:{password}"
f"@{parse_result.netloc}{parse_result.path}"
)
return new_path