nuvolaris/milvus_simple_client.py (187 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # from types import NoneType from typing import Optional from requests.exceptions import HTTPError import requests class MilvusSimpleException(Exception): def __init__(self, code: int, message: str): self.message = message self.code = code def __str__(self): return f"{self.message} ({self.code})" class MilvusUnauthorizedException(MilvusSimpleException): def __init__(self): super().__init__(0, "Unauthorized") class MilvusSimpleClient: def __init__(self, uri: str, token: str, db_name: str = None): self.milvus_url = uri self.token = token self.db_name = db_name def _request(self, endpoint: str, json=None, method: str = "POST", api_level="v2"): headers = { "Content-Type": "application/json", "Authorization": f"Bearer {self.token}" } url = f"{self.milvus_url}/{api_level}/vectordb/{endpoint}" if json is None: json = {} response = requests.request(method, url, headers=headers, json=json) try: response.raise_for_status() res = response.json() if type(res.get('data')) is not NoneType: return res['data'] else: raise MilvusSimpleException(code=res.get('code'), message=res.get('message')) except HTTPError as e: if e.response.status_code == 403: raise MilvusUnauthorizedException() def close(self): pass # User operations def create_user(self, user_name: str, password: str): payload = {"userName": user_name, "password": password} return self._request("users/create", json=payload) def describe_user(self, user_name: str): payload = {"userName": user_name} return self._request(f"users/describe", json=payload) def update_password(self, user_name: str, old_password: str, new_password: str): payload = {"userName": user_name, "password": old_password, "newPassword": new_password} return self._request("users/update_password", json=payload) def drop_user(self, user_name: str): payload = {"userName": user_name} return self._request(f"users/drop", json=payload) def list_users(self): return self._request("users/list") def grant_role(self, role_name: str, user_name: str, db_name: Optional[str] = None): payload = {"roleName": role_name, "userName": user_name} if db_name is not None: payload["dbName"] = db_name return self._request(f"users/grant_role", json=payload) def revoke_role(self, role_name: str, user_name: str, db_name: Optional[str] = None): payload = {"roleName": role_name, "userName": user_name} if db_name is not None: payload["dbName"] = db_name return self._request(f"users/revoke_role", json=payload) # Database operations def alter_database(self, db_name: str, properties: dict): payload = {"dbName": db_name, "properties": properties} return self._request("databases/alter", json=payload) def create_database(self, db_name: str, properties: Optional[dict] = None, **kwargs): payload = {"dbName": db_name} if properties is not None: payload["properties"] = properties for k, v in kwargs.items(): payload[k] = v return self._request("databases/create", json=payload) def describe_database(self, db_name: str): payload = {"dbName": db_name} return self._request(f"databases/describe", json=payload) def drop_database(self, db_name: str): payload = {"dbName": db_name} return self._request(f"databases/drop", json=payload) def list_databases(self): return self._request(f"databases/list") # Role operations def create_role(self, role_name: str, db_name: Optional[str] = None): payload = {"roleName": role_name} if db_name is not None: payload["dbName"] = db_name return self._request("roles/create", json=payload) def drop_role(self, role_name: str, db_name: Optional[str] = None): payload = {"roleName": role_name} if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/drop", json=payload) def list_roles(self, db_name: Optional[str] = None): payload = {} if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/list", json=payload) def describe_role(self, role_name: str, db_name: Optional[str] = None): payload = {"roleName": role_name} if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/describe", json=payload) def grant_privilege(self, role_name: str, object_type: str, object_name: str, privilege: str, db_name: Optional[str] = None): payload = { "roleName": role_name, "objectType": object_type, "objectName": object_name, "privilege": privilege } if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/grant_privilege", json=payload) def revoke_privilege(self, role_name: str, object_type: str, object_name: str, privilege: str, db_name: Optional[str] = None): payload = { "roleName": role_name, "objectType": object_type, "objectName": object_name, "privilege": privilege } if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/revoke_privilege", json=payload) def grant_privilege_v2(self, role_name: str, object_type: str, object_name: str, collection_name: str, privilege: str, db_name: Optional[str] = None): payload = { "roleName": role_name, "objectType": object_type, "objectName": object_name, "privilege": privilege, "collectionName": collection_name, } if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/grant_privilege_v2", json=payload) def revoke_privilege_v2(self, role_name: str, object_type: str, object_name: str, collection_name: str, privilege: str, db_name: Optional[str] = None): payload = { "roleName": role_name, "objectType": object_type, "objectName": object_name, "privilege": privilege, "collectionName": collection_name, } if db_name is not None: payload["dbName"] = db_name return self._request(f"roles/revoke_privilege_v2", json=payload) # Collection operations def create_collection(self, collection_name: str, db_name: Optional[str] = None, dimension: Optional[int] = None, primary_field_name: str = "id", # default is "id" id_type: str = "Int64", # or "string", vector_field_name: str = "vector", # default is "vector" metric_type: str = "COSINE", auto_id: bool = False, ): if db_name is None: db_name = self.db_name payload = { "dbName": db_name, "collectionName": collection_name, "dimension": dimension, "metricType": metric_type, "idType": id_type, "autoID": auto_id, "primaryFieldName": primary_field_name, "vectorFieldName": vector_field_name, } return self._request(f"collections/create", json=payload) def drop_collection(self, collection_name: str, db_name: Optional[str] = None, **kwargs): payload = { "collectionName": collection_name, } if db_name is not None: payload["dbName"] = db_name else: payload["dbName"] = self.db_name for k, v in kwargs.items(): payload[k] = v return self._request(f"collections/drop", json=payload) def list_collections(self, db_name: Optional[str] = None, **kwargs): payload = { } if db_name is not None: payload["dbName"] = db_name else: payload["dbName"] = self.db_name for k, v in kwargs.items(): payload[k] = v return self._request(f"collections/list", json=payload)