def _upsert_chunk()

in datastore/providers/analyticdb_datastore.py [0:0]


    def _upsert_chunk(self, chunk: DocumentChunk):
        created_at = (
            datetime.fromtimestamp(to_unix_timestamp(chunk.metadata.created_at))
            if chunk.metadata.created_at
            else None
        )
        data = (
            chunk.id,
            chunk.text,
            chunk.embedding,
            chunk.metadata.document_id,
            chunk.metadata.source,
            chunk.metadata.source_id,
            chunk.metadata.url,
            chunk.metadata.author,
            created_at,
        )

        conn = self.connection_pool.getconn()
        try:
            with conn.cursor() as cur:
                # Construct the SQL query and data
                query = f"""
                        INSERT INTO {self.collection_name} (id, content, embedding, document_id, source, source_id, url, author, created_at)
                        VALUES (%s::text, %s::text, %s::real[], %s::text, %s::text, %s::text, %s::text, %s::text, %s::timestamp with time zone)
                        ON CONFLICT (id) DO UPDATE SET
                            content = EXCLUDED.content,
                            embedding = EXCLUDED.embedding,
                            document_id = EXCLUDED.document_id,
                            source = EXCLUDED.source,
                            source_id = EXCLUDED.source_id,
                            url = EXCLUDED.url,
                            author = EXCLUDED.author,
                            created_at = EXCLUDED.created_at;
                """

                # Execute the query
                cur.execute(query, data)

                # Commit the transaction
                conn.commit()
        finally:
            self.connection_pool.putconn(conn)