runnable-hub/python/runnable_hub/store/oss.py (20 lines of code) (raw):

import os import oss2 from ..interface import RunnableFileStore class RunnableOssFileStore(RunnableFileStore): def __init__(self, accessKey, secretKey, endpoint, bucketName): auth = oss2.Auth(accessKey, secretKey) self.bucket = oss2.Bucket(auth, endpoint, bucketName) def saveFile(self, filePath, content): result = self.bucket.put_object(filePath, content) if result.status != 200: raise Exception(f"Failed to save file to OSS. Status: {result.status}") def readFile(self, filePath) -> str: try: object_stream = self.bucket.get_object(filePath) content = object_stream.read().decode('utf-8') return content except oss2.exceptions.NoSuchKey: raise FileNotFoundError(f"File {filePath} does not exist in OSS") except Exception as e: raise Exception(f"Failed to read file from OSS: {str(e)}")