oss-model-connector/ossmodelconnector/oss_model_connector.py (94 lines of code) (raw):

from ._oss_connector import new_oss_connector, Connector import ctypes import torch import builtins import pathlib from typing import Any class UntypedStorageEx: def __init__(self, file, size): self.file = file self.addr = memoryview((ctypes.c_ubyte * size).from_address(self.file.mmap())) def untyped(self): return self def __getitem__(self, idx): return self.addr[idx] class OssModelConnector: """ A connector class for interfacing with OSS for model loading, providing high-performance methods to load models/objects/files for AI inference. """ def __init__( self, endpoint: str, cred_path: str = "", config_path: str = "", cred_provider: Any = None, ): """ Initializes the connector with endpoint and optional credential information. Args: endpoint(str): The OSS endpoint to connect to. cred_path(str, optional): Path to the credential file. Defaults to "". config_path(str, optional): Path to the configuration file. Defaults to "". cred_provider(Any, optional): Credential provider. Defaults to None. Raises: ValueError: If endpoint or credential is not provided. """ if not endpoint: raise ValueError("endpoint must be non-empty") if cred_provider is None and not cred_path: raise ValueError("Either cred_path or cred_provider must be provided") self._endpoint = endpoint if not cred_path: self._cred_path = "" else: self._cred_path = cred_path if not config_path: self._config_path = "" else: self._config_path = config_path self._cred_provider = cred_provider self._real_connector = None self._hook_dir = '' self._origin_from_file = torch.UntypedStorage.from_file self._origin_open = builtins.open def __del__(self): self.close() @property def _connector(self): if self._real_connector is None: if self._cred_provider is not None: self._real_connector = new_oss_connector(self._endpoint, self._cred_provider, self._config_path) else: self._real_connector = new_oss_connector(self._endpoint, self._cred_path, self._config_path) return self._real_connector def close(self): """ Close the connector and release resources. """ try: if self._hook_dir: self._hook_dir = '' if builtins.open == self._connector_open: builtins.open = self._origin_open if torch.UntypedStorage.from_file == self._from_file_helper: torch.UntypedStorage.from_file = self._origin_from_file if self._real_connector is not None: del self._real_connector self._real_connector = None except: print("exception in close, ignore") def open(self, uri, binary = True): """ Opens an object from OSS storage. Args: uri(str): The uri (oss://{bucket}/{object_name}) of the object to open. binary(bool): Flag indicating whether to open in binary mode or not. Returns: Stream-like object of the opened OSS object. """ return self._connector.open(uri, True, True, binary) def _from_file_helper(self, filename, shared, nbytes): if self._hook_dir and filename.startswith(self._hook_dir): file = self._connector.open(filename, True, True) return UntypedStorageEx(file, nbytes) else: return self._origin_from_file(filename, shared, nbytes) def _connector_open(self, file, mode='r', buffering=-1, encoding=None, errors=None, newline=None, closefd=True, opener=None): if isinstance(file, pathlib.Path): file = str(file) if self._hook_dir and file.startswith(self._hook_dir): binary = False if 'b' in mode: binary = True try: return self.open(file, binary) except: return self._origin_open(file, mode, buffering, encoding, errors, newline, closefd, opener) else: return self._origin_open(file, mode, buffering, encoding, errors, newline, closefd, opener) def prepare_directory(self, uri: str, dir: str, libc_hook: bool = False): """ Prepare the directory from OSS storage, which can be used as directory 'dir' in vllm/transformers or other frameworks. Args: uri(str): The URI (oss://{bucket}/{directory}) of the OSS directory. dir(str): The local directory used for vllm/transformers or other frameworks. libc_hook (bool): Flag to enable libc hooking. Raises: RuntimeError: If prepare directory failed. """ if not dir.endswith('/'): dir += '/' self._connector.prepare_directory(uri, dir, libc_hook) if not libc_hook: builtins.open = self._connector_open torch.UntypedStorage.from_file = self._from_file_helper self._hook_dir = dir def list(self, bucket: str, prefix: str, fast: bool = False): """ Lists objects in a specified OSS bucket with a given prefix. Args: bucket(str): The OSS bucket name. prefix(str): The prefix filter for object listing. fast (bool): If true, enables fast list mode. Returns: List: A list of objects matching the bucket and prefix criteria. """ return self._connector.list(bucket, prefix, fast)