11-recommendation/upload-to-qdrant.py (100 lines of code) (raw):
import csv
import logging
import ast
from typing import List, Dict, Any
from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, PointStruct
from config import Config
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
# Initialize Qdrant client
qdrant_client = QdrantClient(host=Config.QDRANT_HOST, port=Config.QDRANT_PORT)
def read_csv_data(csv_file_path: str) -> List[Dict[str, Any]]:
"""
Reads data from a CSV file and returns a list of dictionaries.
"""
data_entries = []
try:
with open(csv_file_path, mode='r', encoding='utf-8') as csvfile:
reader = csv.DictReader(csvfile)
for row in reader:
# Convert data types as necessary
row['price'] = float(row['price']) if row['price'] else None
embedding_str = row.get('embedded_description')
if embedding_str:
try:
row['embedded_description'] = ast.literal_eval(embedding_str)
if not isinstance(row['embedded_description'], list):
logger.error(f"Embedding for row {row} is not a list.")
row['embedded_description'] = []
except (ValueError, SyntaxError) as e:
logger.error(f"Error parsing embedding for row {row}: {e}")
row['embedded_description'] = []
else:
row['embedded_description'] = []
data_entries.append(row)
logger.info(f"Read {len(data_entries)} entries from {csv_file_path}")
except Exception as e:
logger.error(f"Error reading CSV file {csv_file_path}: {e}")
return data_entries
def create_qdrant_collection(collection_name: str, vector_size: int, distance_metric: str = 'Cosine'):
"""
Creates a Qdrant collection with the specified configuration.
"""
qdrant_client.recreate_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=distance_metric
)
)
logger.info(
f"Collection '{collection_name}' created with vector size {vector_size} and distance metric '{distance_metric}'.")
def insert_data_into_qdrant(data_entries: List[Dict[str, Any]]):
"""
Inserts data into the Qdrant collection using precomputed embeddings from the CSV.
"""
points = []
for idx, entry in enumerate(data_entries):
embedded_description = entry.get('embedded_description')
if not isinstance(embedded_description, list):
if embedded_description:
try:
embedded_description = ast.literal_eval(embedded_description)
if not isinstance(embedded_description, list):
logger.warning(f"Embedding for row {idx} is not a list. Keeping original value.")
except (ValueError, SyntaxError) as e:
logger.error(f"Error parsing embedding for row {idx}: {e}. Keeping original value.")
else:
logger.warning(f"Missing embedding for row {idx}. Converting to empty list.")
embedded_description = []
entry['embedded_description'] = embedded_description
point = PointStruct(
id=idx,
vector=embedded_description,
payload={
"product_id": entry.get("id"),
"product_name": entry.get("product_name"),
"category": entry.get("category"),
"price": entry.get("price"),
"detailed_description": entry.get("detailed_description"),
}
)
points.append(point)
# Batch insertion
if len(points) >= Config.BATCH_SIZE:
try:
qdrant_client.upsert(
collection_name=Config.COLLECTION_NAME,
points=points
)
logger.info(f"Inserted batch of {len(points)} points into Qdrant.")
points = []
except Exception as e:
logger.error(f"Failed to upsert batch to Qdrant: {e}")
# Insert any remaining points
if points:
try:
qdrant_client.upsert(
collection_name=Config.COLLECTION_NAME,
points=points
)
logger.info(f"Inserted final batch of {len(points)} points into Qdrant.")
except Exception as e:
logger.error(f"Failed to upsert final batch to Qdrant: {e}")
def spin_up_qdrant_database():
"""
Spins up the Qdrant database by creating the collection and inserting data.
"""
create_qdrant_collection(Config.COLLECTION_NAME, Config.VECTOR_SIZE)
data_entries = read_csv_data(Config.CSV_FILE_PATH)
if data_entries:
insert_data_into_qdrant(data_entries)
else:
logger.warning("No data entries found to insert into Qdrant.")
if __name__ == "__main__":
spin_up_qdrant_database()