datastore/providers/azurecosmosdb_datastore.py (208 lines of code) (raw):
import logging
import os
import certifi
import numpy as np
import pymongo
from pymongo.mongo_client import MongoClient
from abc import ABC, abstractmethod
from typing import Dict, List, Optional
from datetime import datetime
from datastore.datastore import DataStore
from models.models import (
DocumentChunk,
DocumentMetadataFilter,
DocumentChunkWithScore,
DocumentMetadataFilter,
QueryResult,
QueryWithEmbedding,
)
from services.date import to_unix_timestamp
# Read environment variables for CosmosDB Mongo vCore
AZCOSMOS_API = os.environ.get("AZCOSMOS_API", "mongo-vcore")
AZCOSMOS_CONNSTR = os.environ.get("AZCOSMOS_CONNSTR")
AZCOSMOS_DATABASE_NAME = os.environ.get("AZCOSMOS_DATABASE_NAME")
AZCOSMOS_CONTAINER_NAME = os.environ.get("AZCOSMOS_CONTAINER_NAME")
assert AZCOSMOS_API is not None
assert AZCOSMOS_CONNSTR is not None
assert AZCOSMOS_DATABASE_NAME is not None
assert AZCOSMOS_CONTAINER_NAME is not None
# OpenAI Ada Embeddings Dimension
VECTOR_DIMENSION = int(os.environ.get("EMBEDDING_DIMENSION", 256))
# Abstract class similar to the original data store that allows API level abstraction
class AzureCosmosDBStoreApi(ABC):
@abstractmethod
async def ensure(self, num_lists, similarity):
raise NotImplementedError
@abstractmethod
async def upsert_core(self, docId: str, chunks: List[DocumentChunk]) -> List[str]:
raise NotImplementedError
@abstractmethod
async def query_core(
self, query: QueryWithEmbedding
) -> List[DocumentChunkWithScore]:
raise NotImplementedError
@abstractmethod
async def drop_container(self):
raise NotImplementedError
@abstractmethod
async def delete_filter(self, filter: DocumentMetadataFilter):
raise NotImplementedError
@abstractmethod
async def delete_ids(self, ids: List[str]):
raise NotImplementedError
@abstractmethod
async def delete_document_ids(self, documentIds: List[str]):
raise NotImplementedError
class MongoStoreApi(AzureCosmosDBStoreApi):
def __init__(self, mongoClient: MongoClient):
self.mongoClient = mongoClient
@staticmethod
def _get_metadata_filter(filter: DocumentMetadataFilter) -> dict:
returnedFilter: dict = {}
if filter.document_id is not None:
returnedFilter["document_id"] = filter.document_id
if filter.author is not None:
returnedFilter["metadata.author"] = filter.author
if filter.start_date is not None:
returnedFilter["metadata.created_at"] = {
"$gt": datetime.fromisoformat(filter.start_date)
}
if filter.end_date is not None:
returnedFilter["metadata.created_at"] = {
"$lt": datetime.fromisoformat(filter.end_date)
}
if filter.source is not None:
returnedFilter["metadata.source"] = filter.source
if filter.source_id is not None:
returnedFilter["metadata.source_id"] = filter.source_id
return returnedFilter
async def ensure(self, num_lists, similarity):
assert self.mongoClient.is_mongos
self.collection = self.mongoClient[AZCOSMOS_DATABASE_NAME][
AZCOSMOS_CONTAINER_NAME
]
indexes = self.collection.index_information()
if indexes.get("embedding_cosmosSearch") is None:
# Ensure the vector index exists.
indexDefs: List[any] = [
{
"name": "embedding_cosmosSearch",
"key": {"embedding": "cosmosSearch"},
"cosmosSearchOptions": {
"kind": "vector-ivf",
"numLists": num_lists,
"similarity": similarity,
"dimensions": VECTOR_DIMENSION,
},
}
]
self.mongoClient[AZCOSMOS_DATABASE_NAME].command(
"createIndexes", AZCOSMOS_CONTAINER_NAME, indexes=indexDefs
)
async def upsert_core(self, docId: str, chunks: List[DocumentChunk]) -> List[str]:
# Until nested doc embedding support is done, treat each chunk as a separate doc.
doc_ids: List[str] = []
for chunk in chunks:
finalDocChunk: dict = {
"_id": f"doc:{docId}:chunk:{chunk.id}",
"document_id": docId,
"embedding": chunk.embedding,
"text": chunk.text,
"metadata": chunk.metadata.__dict__,
}
if chunk.metadata.created_at is not None:
finalDocChunk["metadata"]["created_at"] = datetime.fromisoformat(
chunk.metadata.created_at
)
self.collection.insert_one(finalDocChunk)
doc_ids.append(finalDocChunk["_id"])
return doc_ids
async def query_core(
self, query: QueryWithEmbedding
) -> List[DocumentChunkWithScore]:
pipeline = [
{
"$search": {
"cosmosSearch": {
"vector": query.embedding,
"path": "embedding",
"k": query.top_k,
},
"returnStoredSource": True,
}
},
{
"$project": {
"similarityScore": {"$meta": "searchScore"},
"document": "$$ROOT",
}
},
]
# TODO: Add in match filter (once it can be satisfied).
# Perform vector search
query_results: List[DocumentChunkWithScore] = []
for aggResult in self.collection.aggregate(pipeline):
finalMetadata = aggResult["document"]["metadata"]
if finalMetadata["created_at"] is not None:
finalMetadata["created_at"] = datetime.isoformat(
finalMetadata["created_at"]
)
result = DocumentChunkWithScore(
id=aggResult["_id"],
score=aggResult["similarityScore"],
text=aggResult["document"]["text"],
metadata=finalMetadata,
)
query_results.append(result)
return query_results
async def drop_container(self):
self.collection.drop()
async def delete_filter(self, filter: DocumentMetadataFilter):
delete_filter = self._get_metadata_filter(filter)
self.collection.delete_many(delete_filter)
async def delete_ids(self, ids: List[str]):
self.collection.delete_many({"_id": {"$in": ids}})
async def delete_document_ids(self, documentIds: List[str]):
self.collection.delete_many({"document_id": {"$in": documentIds}})
# Datastore implementation.
"""
A class representing a memory store for Azure CosmosDB DataStore, currently only supports Mongo vCore
"""
class AzureCosmosDBDataStore(DataStore):
def __init__(self, cosmosStore: AzureCosmosDBStoreApi):
self.cosmosStore = cosmosStore
"""
Creates a new datastore based on the Cosmos Api provided in the environment variables,
only supports Mongo vCore for now
Args:
numLists (int) : This integer is the number of clusters that the inverted file (IVF) index
uses to group the vector data. We recommend that numLists is set to
documentCount/1000 for up to 1 million documents and to sqrt(documentCount)
for more than 1 million documents. Using a numLists value of 1 is akin to
performing brute-force search, which has limited performance.
similarity (str) : Similarity metric to use with the IVF index. Possible options are COS (cosine distance),
L2 (Euclidean distance), and IP (inner product).
"""
@staticmethod
async def create(num_lists, similarity) -> DataStore:
# Create underlying data store based on the API definition.
# Right now this only supports Mongo, but set up to support more.
apiStore: AzureCosmosDBStoreApi = None
if AZCOSMOS_API == "mongo-vcore":
mongoClient = MongoClient(AZCOSMOS_CONNSTR)
apiStore = MongoStoreApi(mongoClient)
else:
raise NotImplementedError
await apiStore.ensure(num_lists, similarity)
store = AzureCosmosDBDataStore(apiStore)
return store
async def _upsert(self, chunks: Dict[str, List[DocumentChunk]]) -> List[str]:
"""
Takes in a list of list of document chunks and inserts them into the database.
Return a list of document ids.
"""
# Initialize a list of ids to return
doc_ids: List[str] = []
for doc_id, chunk_list in chunks.items():
returnedIds = await self.cosmosStore.upsert_core(doc_id, chunk_list)
for returnedId in returnedIds:
doc_ids.append(returnedId)
return doc_ids
async def _query(
self,
queries: List[QueryWithEmbedding],
) -> List[QueryResult]:
"""
Takes in a list of queries with embeddings and filters and
returns a list of query results with matching document chunks and scores.
"""
# Prepare query responses and results object
results: List[QueryResult] = []
# Gather query results in a pipeline
logging.info(f"Gathering {len(queries)} query results", flush=True)
for query in queries:
logging.info(f"Query: {query.query}")
query_results = await self.cosmosStore.query_core(query)
# Add to overall results
results.append(QueryResult(query=query.query, results=query_results))
return results
async def delete(
self,
ids: Optional[List[str]] = None,
filter: Optional[DocumentMetadataFilter] = None,
delete_all: Optional[bool] = None,
) -> bool:
"""
Removes vectors by ids, filter, or everything in the datastore.
Returns whether the operation was successful.
"""
if delete_all:
# fast path - truncate/delete all items.
await self.cosmosStore.drop_container()
return True
if filter:
if filter.document_id is not None:
await self.cosmosStore.delete_document_ids([filter.document_id])
else:
await self.cosmosStore.delete_filter(filter)
if ids:
await self.cosmosStore.delete_ids(ids)
return True