datastore/providers/zilliz_datastore.py (46 lines of code) (raw):
import os
from loguru import logger
from typing import Optional
from pymilvus import (
connections,
)
from uuid import uuid4
from datastore.providers.milvus_datastore import (
MilvusDataStore,
)
ZILLIZ_COLLECTION = os.environ.get("ZILLIZ_COLLECTION") or "c" + uuid4().hex
ZILLIZ_URI = os.environ.get("ZILLIZ_URI")
ZILLIZ_USER = os.environ.get("ZILLIZ_USER")
ZILLIZ_PASSWORD = os.environ.get("ZILLIZ_PASSWORD")
ZILLIZ_USE_SECURITY = False if ZILLIZ_PASSWORD is None else True
ZILLIZ_CONSISTENCY_LEVEL = os.environ.get("ZILLIZ_CONSISTENCY_LEVEL")
class ZillizDataStore(MilvusDataStore):
def __init__(self, create_new: Optional[bool] = False):
"""Create a Zilliz DataStore.
The Zilliz Datastore allows for storing your indexes and metadata within a Zilliz Cloud instance.
Args:
create_new (Optional[bool], optional): Whether to overwrite if collection already exists. Defaults to True.
"""
# Overwrite the default consistency level by MILVUS_CONSISTENCY_LEVEL
self._consistency_level = ZILLIZ_CONSISTENCY_LEVEL or "Bounded"
self._create_connection()
self._create_collection(ZILLIZ_COLLECTION, create_new) # type: ignore
self._create_index()
def _create_connection(self):
# Check if the connection already exists
try:
i = [
connections.get_connection_addr(x[0])
for x in connections.list_connections()
].index({"address": ZILLIZ_URI, "user": ZILLIZ_USER})
self.alias = connections.list_connections()[i][0]
except ValueError:
# Connect to the Zilliz instance using the passed in Environment variables
self.alias = uuid4().hex
connections.connect(alias=self.alias, uri=ZILLIZ_URI, user=ZILLIZ_USER, password=ZILLIZ_PASSWORD, secure=ZILLIZ_USE_SECURITY) # type: ignore
logger.info("Connect to zilliz cloud server")
def _create_index(self):
try:
# If no index on the collection, create one
if len(self.col.indexes) == 0:
self.index_params = {
"metric_type": "IP",
"index_type": "AUTOINDEX",
"params": {},
}
self.col.create_index("embedding", index_params=self.index_params)
self.col.load()
self.search_params = {"metric_type": "IP", "params": {}}
except Exception as e:
logger.error("Failed to create index, error: {}".format(e))