in datastore/providers/analyticdb_datastore.py [0:0]
def _upsert_chunk(self, chunk: DocumentChunk):
created_at = (
datetime.fromtimestamp(to_unix_timestamp(chunk.metadata.created_at))
if chunk.metadata.created_at
else None
)
data = (
chunk.id,
chunk.text,
chunk.embedding,
chunk.metadata.document_id,
chunk.metadata.source,
chunk.metadata.source_id,
chunk.metadata.url,
chunk.metadata.author,
created_at,
)
conn = self.connection_pool.getconn()
try:
with conn.cursor() as cur:
# Construct the SQL query and data
query = f"""
INSERT INTO {self.collection_name} (id, content, embedding, document_id, source, source_id, url, author, created_at)
VALUES (%s::text, %s::text, %s::real[], %s::text, %s::text, %s::text, %s::text, %s::text, %s::timestamp with time zone)
ON CONFLICT (id) DO UPDATE SET
content = EXCLUDED.content,
embedding = EXCLUDED.embedding,
document_id = EXCLUDED.document_id,
source = EXCLUDED.source,
source_id = EXCLUDED.source_id,
url = EXCLUDED.url,
author = EXCLUDED.author,
created_at = EXCLUDED.created_at;
"""
# Execute the query
cur.execute(query, data)
# Commit the transaction
conn.commit()
finally:
self.connection_pool.putconn(conn)