backend-apis/deployment_scripts/firestore_upload_data.py (81 lines of code) (raw):

# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # """Upload dataset to Firestore""" import asyncio import json from typing import Literal, TypedDict from google.cloud import firestore, storage firestore_client = firestore.AsyncClient() gcs_client = storage.Client() GCS_BUCKET = "csm-solution-dataset" COLLECTIONS_DICT = { "website_reviews": "persona5/product_reviews.jsonl", "p5-customers": "persona5/customer_list.json", "p5-conversations": "persona5/conversations_search_dataset.jsonl", "p5-reviews": "persona5/reviews_search_dataset.jsonl", } class ReviewDoc(TypedDict): """A dict representing a review document for Firestore""" review: str sentiment: Literal["positive", "negative", "neutral"] stars: int async def review_upload(product_id: str, review: ReviewDoc): """Uploads a single review Args: product_id: Product ID review: Review data """ await firestore_client.collection("website_reviews").document( product_id ).collection("reviews").document().set(review) async def upload_reviews(review_docs: dict[str, list[ReviewDoc]]): """Upload reviews""" await asyncio.gather( *( review_upload(product_id, review) for product_id, reviews in review_docs.items() for review in reviews ) ) async def p5_upload(collection_name, document_id: str, data: dict): """Uploads a single p5 Firestore document Args: document_id: Document ID data: Document data """ await firestore_client.collection(collection_name).document( document_id ).set(data) async def upload_p5_documents(collection_name, documents: dict[str, dict]): """Upload p5 documents""" await asyncio.gather( *( p5_upload(collection_name, document_id, data) for document_id, data in documents.items() ) ) async def main(): """ Upload documents to firestore """ bucket = gcs_client.get_bucket(GCS_BUCKET) collection_lines = {} print("Downloading documents") # Transform JSONL to Documents for collection_name, collection_uri in COLLECTIONS_DICT.items(): print(f"Downloading {collection_uri}") blob = bucket.blob(collection_uri) lines = blob.download_as_text().splitlines() json_lines = [json.loads(line) for line in lines] # Website reviews uses collections of collections if collection_name == "website_reviews": review_docs: dict[str, list[ReviewDoc]] = {} for review_dict in json_lines: review_doc: ReviewDoc = { "review": review_dict["review"], "sentiment": review_dict["sentiment"], "stars": review_dict["stars"], } if review_dict["id"] not in review_docs: review_docs[review_dict["id"]] = [] review_docs[review_dict["id"]].append(review_doc) collection_lines["website_reviews"] = review_docs continue # Other collections are flat docs = {} for line in json_lines: if collection_name != "p5-customers": data = json.loads(line["jsonData"]) else: data = line if collection_name == "p5-customers": docs[line["customer_id"]] = data else: docs[line["id"]] = data collection_lines[collection_name] = docs # Upload collections for name, lines in collection_lines.items(): print(f"Uploading collection: {name}") if name == "website_reviews": await upload_reviews(lines) else: await upload_p5_documents(name, lines) if __name__ == "__main__": asyncio.run(main())