colab-enterprise/Synthetic-Data-Generation-Customers-Reviews.ipynb (1,036 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"metadata": {
"id": "clfviNVNXpqj"
},
"source": [
"### <font color='#4285f4'>Overview</font>"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Generates customer reviews for menu items, encompassing positive, neutral, and negative sentiments.\n",
"\n",
"Process Flow:\n",
"\n",
"1. Create BigQuery table\n",
"2. Get the table schema which will be passed into the prompt so Gemini has context for each column\n",
"3. Get the current review id since it is an integer primary key and we will need to increment\n",
"4. Get the existing menu items so reviews are for our menu\n",
"5. Set a sentiment variable: “Neutral”, “Positive” or “Neutral” to generate different type of reviews\n",
"6. Use Gemini Pro to generate the review and return the results as JSON\n",
"7. Parse the returned JSON and insert the review into BigQuery\n",
"\n",
"Cost:\n",
"* Low: Gemini, BigQuery\n",
"* Medium: Remember to stop your Colab Enterprise Notebook Runtime\n",
"\n",
"Author: \n",
"* Adam Paternostro"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "W3Q8_NLjj3Xw"
},
"source": [
"### <font color='#4285f4'>License</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "qJxJqWa1j3Xw"
},
"outputs": [],
"source": [
"##################################################################################\n",
"# Copyright 2024 Google LLC\n",
"#\n",
"# Licensed under the Apache License, Version 2.0 (the \"License\");\n",
"# you may not use this file except in compliance with the License.\n",
"# You may obtain a copy of the License at\n",
"#\n",
"# https://www.apache.org/licenses/LICENSE-2.0\n",
"#\n",
"# Unless required by applicable law or agreed to in writing, software\n",
"# distributed under the License is distributed on an \"AS IS\" BASIS,\n",
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n",
"# See the License for the specific language governing permissions and\n",
"# limitations under the License.\n",
"###################################################################################"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "uSra7USNj3Xx"
},
"source": [
"### <font color='#4285f4'>Pip Installs</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "IBtPiyK3kKwS"
},
"outputs": [],
"source": [
"# PIP Installs\n",
"import sys\n",
"\n",
"# https://PLACEHOLDER.com/index.html\n",
"#!{sys.executable} -m pip install PLACEHOLDER"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "DszuLZoo9A7k"
},
"source": [
"### <font color='#4285f4'>Initialize</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "bhKxJadjWa1R"
},
"outputs": [],
"source": [
"from PIL import Image\n",
"import IPython.display\n",
"import google.auth\n",
"import requests\n",
"import json\n",
"import uuid\n",
"import base64\n",
"import os\n",
"import cv2\n",
"import random\n",
"import time\n",
"import datetime\n",
"import base64\n",
"import random\n",
"\n",
"import logging\n",
"from tenacity import retry, wait_exponential, stop_after_attempt, before_sleep_log, retry_if_exception"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "MSC6-rboip3h"
},
"outputs": [],
"source": [
"# Set these (run this cell to verify the output)\n",
"\n",
"bigquery_location = \"${bigquery_location}\"\n",
"region = \"${region}\"\n",
"location = \"${location}\"\n",
"storage_account = \"${chocolate_ai_bucket}\"\n",
"public_storage_storage_account = \"data-analytics-golden-demo\"\n",
"table_name = \"customer_review\"\n",
"dataset_name = \"${bigquery_chocolate_ai_dataset}\"\n",
"\n",
"# Get the current date and time\n",
"now = datetime.datetime.now()\n",
"\n",
"# Format the date and time as desired\n",
"formatted_date = now.strftime(\"%Y-%m-%d-%H-%M\")\n",
"\n",
"# Get some values using gcloud\n",
"project_id = !(gcloud config get-value project)\n",
"user = !(gcloud auth list --filter=status:ACTIVE --format=\"value(account)\")\n",
"\n",
"if len(project_id) != 1:\n",
" raise RuntimeError(f\"project_id is not set: {project_id}\")\n",
"project_id = project_id[0]\n",
"\n",
"if len(user) != 1:\n",
" raise RuntimeError(f\"user is not set: {user}\")\n",
"user = user[0]\n",
"\n",
"print(f\"project_id = {project_id}\")\n",
"print(f\"user = {user}\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "0mSjSjjylFnR"
},
"source": [
"### <font color='#4285f4'>Helper Methods</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "be6n7F9IkdbT"
},
"source": [
"#### restAPIHelper\n",
"Calls the Google Cloud REST API using the current users credentials."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "w_QjQhvWkesy"
},
"outputs": [],
"source": [
"def restAPIHelper(url: str, http_verb: str, request_body: str) -> str:\n",
" \"\"\"Calls the Google Cloud REST API passing in the current users credentials\"\"\"\n",
"\n",
" import requests\n",
" import google.auth\n",
" import json\n",
"\n",
" # Get an access token based upon the current user\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request()\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" if http_verb == \"GET\":\n",
" response = requests.get(url, headers=headers)\n",
" elif http_verb == \"POST\":\n",
" response = requests.post(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PUT\":\n",
" response = requests.put(url, json=request_body, headers=headers)\n",
" elif http_verb == \"PATCH\":\n",
" response = requests.patch(url, json=request_body, headers=headers)\n",
" elif http_verb == \"DELETE\":\n",
" response = requests.delete(url, headers=headers)\n",
" else:\n",
" raise RuntimeError(f\"Unknown HTTP verb: {http_verb}\")\n",
"\n",
" if response.status_code == 200:\n",
" return json.loads(response.content)\n",
" #image_data = json.loads(response.content)[\"predictions\"][0][\"bytesBase64Encoded\"]\n",
" else:\n",
" error = f\"Error restAPIHelper -> ' Status: '{response.status_code}' Text: '{response.text}'\"\n",
" raise RuntimeError(error)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### RetryCondition (for retrying LLM calls)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def RetryCondition(error):\n",
" error_string = str(error)\n",
" print(error_string)\n",
"\n",
" retry_errors = [\n",
" \"RESOURCE_EXHAUSTED\",\n",
" \"No content in candidate\",\n",
" # Add more error messages here as needed\n",
" ]\n",
"\n",
" for retry_error in retry_errors:\n",
" if retry_error in error_string:\n",
" print(\"Retrying...\")\n",
" return True\n",
"\n",
" return False"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "UCSP64xskgje"
},
"source": [
"#### Gemini LLM (Pro 1.0 , Pro 1.5)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "6M2ojhjYkhjy"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def GeminiLLM(prompt, model = \"gemini-2.0-flash\", response_schema = None,\n",
" temperature = 1, topP = 1, topK = 32):\n",
"\n",
" # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models\n",
" # model = \"gemini-2.0-flash\"\n",
"\n",
" llm_response = None\n",
" if temperature < 0:\n",
" temperature = 0\n",
"\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request() # required to acess access token\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference\n",
" url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent\"\n",
"\n",
" generation_config = {\n",
" \"temperature\": temperature,\n",
" \"topP\": topP,\n",
" \"maxOutputTokens\": 8192,\n",
" \"candidateCount\": 1,\n",
" \"responseMimeType\": \"application/json\",\n",
" }\n",
"\n",
" # Add inthe response schema for when it is provided\n",
" if response_schema is not None:\n",
" generation_config[\"responseSchema\"] = response_schema\n",
"\n",
" if model == \"gemini-2.0-flash\":\n",
" generation_config[\"topK\"] = topK\n",
"\n",
" payload = {\n",
" \"contents\": {\n",
" \"role\": \"user\",\n",
" \"parts\": {\n",
" \"text\": prompt\n",
" },\n",
" },\n",
" \"generation_config\": {\n",
" **generation_config\n",
" },\n",
" \"safety_settings\": {\n",
" \"category\": \"HARM_CATEGORY_SEXUALLY_EXPLICIT\",\n",
" \"threshold\": \"BLOCK_LOW_AND_ABOVE\"\n",
" }\n",
" }\n",
"\n",
" response = requests.post(url, json=payload, headers=headers)\n",
"\n",
" if response.status_code == 200:\n",
" try:\n",
" json_response = json.loads(response.content)\n",
" except Exception as error:\n",
" raise RuntimeError(f\"An error occurred parsing the JSON: {error}\")\n",
"\n",
" if \"candidates\" in json_response:\n",
" candidates = json_response[\"candidates\"]\n",
" if len(candidates) > 0:\n",
" candidate = candidates[0]\n",
" if \"content\" in candidate:\n",
" content = candidate[\"content\"]\n",
" if \"parts\" in content:\n",
" parts = content[\"parts\"]\n",
" if len(parts):\n",
" part = parts[0]\n",
" if \"text\" in part:\n",
" text = part[\"text\"]\n",
" llm_response = text\n",
" else:\n",
" raise RuntimeError(\"No text in part: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No parts in content: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No parts in content: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No content in candidate: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No candidates: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No candidates: {response.content}\")\n",
"\n",
" # Remove some typically response characters (if asking for a JSON reply)\n",
" llm_response = llm_response.replace(\"```json\",\"\")\n",
" llm_response = llm_response.replace(\"```\",\"\")\n",
" llm_response = llm_response.replace(\"\\n\",\"\")\n",
"\n",
" return llm_response\n",
"\n",
" else:\n",
" raise RuntimeError(f\"Error with prompt:'{prompt}' Status:'{response.status_code}' Text:'{response.text}'\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "vbNakk0fkjTj"
},
"source": [
"#### Gemini LLM - Multimodal"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "3H0r26TxklPu"
},
"outputs": [],
"source": [
"@retry(wait=wait_exponential(multiplier=1, min=1, max=60), stop=stop_after_attempt(10), retry=retry_if_exception(RetryCondition), before_sleep=before_sleep_log(logging.getLogger(), logging.INFO))\n",
"def GeminiLLM_Multimodal(multimodal_prompt_list, model = \"gemini-2.0-flash\", response_schema = None,\n",
" temperature = 1, topP = 1, topK = 32):\n",
"\n",
" # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference#supported_models\n",
" # model = \"gemini-2.0-flash\"\n",
"\n",
" llm_response = None\n",
" if temperature < 0:\n",
" temperature = 0\n",
"\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request() # required to acess access token\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" # https://cloud.google.com/vertex-ai/generative-ai/docs/model-reference/inference\n",
" url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project_id}/locations/{location}/publishers/google/models/{model}:generateContent\"\n",
"\n",
" generation_config = {\n",
" \"temperature\": temperature,\n",
" \"topP\": topP,\n",
" \"maxOutputTokens\": 8192,\n",
" \"candidateCount\": 1,\n",
" \"responseMimeType\": \"application/json\",\n",
" }\n",
"\n",
" # Add inthe response schema for when it is provided\n",
" if response_schema is not None:\n",
" generation_config[\"responseSchema\"] = response_schema\n",
"\n",
" if model == \"gemini-2.0-flash\":\n",
" generation_config[\"topK\"] = topK\n",
"\n",
" payload = {\n",
" \"contents\": {\n",
" \"role\": \"user\",\n",
" \"parts\": multimodal_prompt_list\n",
" },\n",
" \"generation_config\": {\n",
" **generation_config\n",
" },\n",
" \"safety_settings\": {\n",
" \"category\": \"HARM_CATEGORY_SEXUALLY_EXPLICIT\",\n",
" \"threshold\": \"BLOCK_LOW_AND_ABOVE\"\n",
" }\n",
" }\n",
"\n",
" response = requests.post(url, json=payload, headers=headers)\n",
"\n",
" if response.status_code == 200:\n",
" try:\n",
" json_response = json.loads(response.content)\n",
" except Exception as error:\n",
" raise RuntimeError(f\"An error occurred parsing the JSON: {error}\")\n",
"\n",
" if \"candidates\" in json_response:\n",
" candidates = json_response[\"candidates\"]\n",
" if len(candidates) > 0:\n",
" candidate = candidates[0]\n",
" if \"content\" in candidate:\n",
" content = candidate[\"content\"]\n",
" if \"parts\" in content:\n",
" parts = content[\"parts\"]\n",
" if len(parts):\n",
" part = parts[0]\n",
" if \"text\" in part:\n",
" text = part[\"text\"]\n",
" llm_response = text\n",
" else:\n",
" raise RuntimeError(\"No text in part: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No parts in content: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No parts in content: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No content in candidate: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No candidates: {response.content}\")\n",
" else:\n",
" raise RuntimeError(\"No candidates: {response.content}\")\n",
"\n",
" # Remove some typically response characters (if asking for a JSON reply)\n",
" llm_response = llm_response.replace(\"```json\",\"\")\n",
" llm_response = llm_response.replace(\"```\",\"\")\n",
" llm_response = llm_response.replace(\"\\n\",\"\")\n",
"\n",
" return llm_response\n",
"\n",
" else:\n",
" raise RuntimeError(f\"Error with prompt:'{prompt}' Status:'{response.status_code}' Text:'{response.text}'\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "RtE_Q-H6lJQT"
},
"source": [
"#### Imagen3 Image Generation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "hnfwd_iplO9c"
},
"outputs": [],
"source": [
"def ImageGen(prompt):\n",
" creds, project = google.auth.default()\n",
" auth_req = google.auth.transport.requests.Request()\n",
" creds.refresh(auth_req)\n",
" access_token=creds.token\n",
"\n",
" headers = {\n",
" \"Content-Type\" : \"application/json\",\n",
" \"Authorization\" : \"Bearer \" + access_token\n",
" }\n",
"\n",
" model_version = \"imagen-3.0-generate-001\" # imagen-3.0-fast-generate-001\n",
" #model_version = \"imagen-3.0-generate-preview-0611\" # Preview Access Model\n",
"\n",
" # https://cloud.google.com/vertex-ai/docs/generative-ai/model-reference/image-generation\n",
" # url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/imagegeneration:predict\"\n",
" url = f\"https://{location}-aiplatform.googleapis.com/v1/projects/{project}/locations/{location}/publishers/google/models/{model_version}:predict\"\n",
"\n",
" payload = {\n",
" \"instances\": [\n",
" {\n",
" \"prompt\": prompt\n",
" }\n",
" ],\n",
" \"parameters\": {\n",
" \"sampleCount\": 1,\n",
" \"personGeneration\" : \"dont_allow\" # change to allow_adult for people generation\n",
" }\n",
" }\n",
"\n",
" response = requests.post(url, json=payload, headers=headers)\n",
"\n",
" if response.status_code == 200:\n",
" response_json = json.loads(response.content)\n",
" # print(f\"Imagen3 response_json: {response_json}\")\n",
"\n",
" if \"blocked\" in response_json:\n",
" print(f\"Blocked: {response_json['blocked']}\")\n",
"\n",
" if \"predictions\" in response_json:\n",
" image_data = response_json[\"predictions\"][0][\"bytesBase64Encoded\"]\n",
" image_data = base64.b64decode(image_data)\n",
" filename= str(uuid.uuid4()) + \".png\"\n",
" with open(filename, \"wb\") as f:\n",
" f.write(image_data)\n",
" print(f\"Image generated OK.\")\n",
" return filename\n",
" else:\n",
" raise RuntimeError(f\"No predictions in response: {response.content}\")\n",
" else:\n",
" error = f\"Error with prompt:'{prompt}' Status:'{response.status_code}' Text:'{response.text}'\"\n",
" raise RuntimeError(error)"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ewLQxnZ0kl6e"
},
"source": [
"#### Helper Functions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "WQ-j3CTRlXo_"
},
"outputs": [],
"source": [
"def RunQuery(sql):\n",
" import time\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client()\n",
"\n",
" if (sql.startswith(\"SELECT\") or sql.startswith(\"WITH\")):\n",
" df_result = client.query(sql).to_dataframe()\n",
" return df_result\n",
" else:\n",
" job_config = bigquery.QueryJobConfig(priority=bigquery.QueryPriority.INTERACTIVE)\n",
" query_job = client.query(sql, job_config=job_config)\n",
"\n",
" # Check on the progress by getting the job's updated state.\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" while query_job.state != \"DONE\":\n",
" time.sleep(2)\n",
" query_job = client.get_job(\n",
" query_job.job_id, location=query_job.location\n",
" )\n",
" print(\"Job {} is currently in state {} with error result of {}\".format(query_job.job_id, query_job.state, query_job.error_result))\n",
"\n",
" if query_job.error_result == None:\n",
" return True\n",
" else:\n",
" raise Exception(query_job.error_result)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "Y6gEjHLNKLGM"
},
"outputs": [],
"source": [
"def GetTableSchema(project_id, dataset_name, table_name):\n",
" import io\n",
" from google.cloud import bigquery\n",
"\n",
" client = bigquery.Client()\n",
"\n",
" dataset_ref = client.dataset(dataset_name, project=project_id)\n",
" table_ref = dataset_ref.table(table_name)\n",
" table = client.get_table(table_ref)\n",
"\n",
" f = io.StringIO(\"\")\n",
" client.schema_to_json(table.schema, f)\n",
" return f.getvalue()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "tUWD4t8gKWO-"
},
"outputs": [],
"source": [
"def GetDistinctValues(project_id, dataset_name, table_name, field_name):\n",
" from google.cloud import bigquery\n",
"\n",
" client = bigquery.Client()\n",
"\n",
" sql = f\"\"\"\n",
" SELECT STRING_AGG(DISTINCT {field_name}, \",\" ) AS result\n",
" FROM `{project_id}.{dataset_name}.{table_name}`\n",
" \"\"\"\n",
"\n",
" df_result = client.query(sql).to_dataframe()\n",
" result_str = df_result['result'].iloc[0]\n",
" if result_str is None:\n",
" return \"\"\n",
" else:\n",
" return result_str"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "zdofv6udKfmF"
},
"outputs": [],
"source": [
"def GetStartingValue(project_id, dataset_name, table_name, field_name):\n",
" from google.cloud import bigquery\n",
"\n",
" client = bigquery.Client()\n",
"\n",
" sql = f\"\"\"\n",
" SELECT IFNULL(MAX({field_name}),0) + 1 AS result\n",
" FROM `{project_id}.{dataset_name}.{table_name}`\n",
" \"\"\"\n",
"\n",
" df_result = client.query(sql).to_dataframe()\n",
" return df_result['result'].iloc[0]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def GetMaxValue(fully_qualified_table_name, field_name):\n",
" from google.cloud import bigquery\n",
" client = bigquery.Client()\n",
" sql = f\"\"\"\n",
" SELECT IFNULL(MAX({field_name}),0) AS result\n",
" FROM `{fully_qualified_table_name}`\n",
" \"\"\"\n",
" # print(sql)\n",
" df_result = client.query(sql).to_dataframe()\n",
" # display(df_result)\n",
" return int(df_result['result'].iloc[0])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "ONcezrrfkplE"
},
"outputs": [],
"source": [
"def PrettyPrintJson(json_string):\n",
" json_object = json.loads(json_string)\n",
" json_formatted_str = json.dumps(json_object, indent=2)\n",
" #print(json_formatted_str)\n",
" return json_formatted_str"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "k3tXA3sDlbak"
},
"outputs": [],
"source": [
"# This was generated by GenAI\n",
"\n",
"def copy_file_to_gcs(local_file_path, bucket_name, destination_blob_name):\n",
" \"\"\"Copies a file from a local drive to a GCS bucket.\n",
"\n",
" Args:\n",
" local_file_path: The full path to the local file.\n",
" bucket_name: The name of the GCS bucket to upload to.\n",
" destination_blob_name: The desired name of the uploaded file in the bucket.\n",
"\n",
" Returns:\n",
" None\n",
" \"\"\"\n",
"\n",
" import os\n",
" from google.cloud import storage\n",
"\n",
" # Ensure the file exists locally\n",
" if not os.path.exists(local_file_path):\n",
" raise FileNotFoundError(f\"Local file '{local_file_path}' not found.\")\n",
"\n",
" # Create a storage client\n",
" storage_client = storage.Client()\n",
"\n",
" # Get a reference to the bucket\n",
" bucket = storage_client.bucket(bucket_name)\n",
"\n",
" # Create a blob object with the desired destination path\n",
" blob = bucket.blob(destination_blob_name)\n",
"\n",
" # Upload the file from the local filesystem\n",
" content_type = \"\"\n",
" if local_file_path.endswith(\".html\"):\n",
" content_type = \"text/html; charset=utf-8\"\n",
"\n",
" if local_file_path.endswith(\".json\"):\n",
" content_type = \"application/json; charset=utf-8\"\n",
"\n",
" if content_type == \"\":\n",
" blob.upload_from_filename(local_file_path)\n",
" else:\n",
" blob.upload_from_filename(local_file_path, content_type = content_type)\n",
"\n",
" print(f\"File '{local_file_path}' uploaded to GCS bucket '{bucket_name}' as '{destination_blob_name}. Content-Type: {content_type}'.\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "NjoOh66Pld1z"
},
"source": [
"### <font color='#4285f4'>BigQuery Table</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "wSE1y7h9lgPk"
},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"/* Fields needed to generate customer marketing profile:\n",
"customer_review_id\n",
"COUNTIF(review_sentiment = 'Positive') AS positive_reviews,\n",
"COUNTIF(review_sentiment = 'Negative') AS negative_reviews,\n",
"COUNTIF(review_sentiment = 'Neutral') AS neutral_reviews\n",
"*/\n",
"\n",
"--drop table if exists `chocolate_ai.customer_review`;\n",
"CREATE TABLE IF NOT EXISTS `chocolate_ai.customer_review`\n",
"(\n",
" customer_review_id INTEGER NOT NULL OPTIONS(description=\"Primary key.\"),\n",
" customer_id INTEGER NOT NULL OPTIONS(description=\"Foreign key: Customer table\"),\n",
" review_datetime TIMESTAMP NOT NULL OPTIONS(description=\"Date and time of the review.\"),\n",
" review_text STRING NOT NULL OPTIONS(description=\"The customer's review of the chocolate.\"),\n",
" review_sentiment STRING OPTIONS(description=\"The sentiment of the review text.\"),\n",
" social_media_channel STRING OPTIONS(description=\"Where on social media the review was posted\"),\n",
")\n",
"CLUSTER BY customer_id;"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "n9frOkAu-7pl"
},
"outputs": [],
"source": [
"def GetMenuItems():\n",
" sql = f\"\"\"SELECT TO_JSON(STRUCT(menu_name, menu_description)) AS menu_item_json\n",
" FROM `chocolate_ai.menu`\"\"\"\n",
"\n",
" result_df = RunQuery(sql)\n",
" result_str = \"\"\n",
"\n",
" for index, row in result_df.iterrows():\n",
" result_str = result_str + row['menu_item_json'] + \",\"\n",
"\n",
" return '[' + result_str + ']'"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "pnhsRHxckr2D"
},
"source": [
"### <font color='#4285f4'>Generate Customer Reviews</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "GfUSulBPHWAU"
},
"outputs": [],
"source": [
"# Write me the json in OpenAPI 3.0 schema object for the below object.\n",
"# Make all fields required.\n",
"# {\n",
"# \"review_datetime\" : \"text\",\n",
"# \"review_text\" : 2000,\n",
"# \"social_media_channel\" : \"text\"\n",
"# }\n",
"response_schema = {\n",
" \"type\": \"object\",\n",
" \"required\": [\n",
" \"review_datetime\",\n",
" \"review_text\",\n",
" \"social_media_channel\"\n",
" ],\n",
" \"properties\": {\n",
" \"review_datetime\": {\n",
" \"type\": \"string\",\n",
" \"format\": \"date-time\"\n",
" },\n",
" \"review_text\": {\n",
" \"type\": \"string\"\n",
" },\n",
" \"social_media_channel\": {\n",
" \"type\": \"string\"\n",
" }\n",
" }\n",
"}\n",
"\n",
"table_schema = GetTableSchema(project_id, dataset_name, table_name)\n",
"customer_review_id = GetStartingValue(project_id, dataset_name, table_name, \"customer_review_id\")\n",
"existing_menu_items = GetMenuItems()\n",
"max_number_of_customers = GetMaxValue(f\"{project_id}.{dataset_name}.customer\",\"customer_id\")\n",
"\n",
"# 11,000 customers to generate\n",
"\n",
"# The generate specific customer ids or to regenerate a customer run this code:\n",
"# customer_review_id = 1\n",
"# for customer_review_id in range(customer_review_id, customer_review_id + 1):\n",
"\n",
"for customer_review_id in range(customer_review_id, customer_review_id + 11000):\n",
" success = False\n",
" while not success:\n",
" try:\n",
" rand_int = random.randint(1, 100)\n",
" customer_id = random.randint(1, max_number_of_customers)\n",
"\n",
" review_sentiment = \"Neutral\"\n",
" if rand_int >= 1 and rand_int <= 75:\n",
" review_sentiment = \"Positive\"\n",
" elif rand_int >= 76 and rand_int <= 90:\n",
" review_sentiment = \"Negative\"\n",
" else:\n",
" review_sentiment = \"Neutral\"\n",
"\n",
" prompt = f\"\"\"You are a database engineer and need to generate data for a table for the below schema.\n",
" You need to generate a review with a {review_sentiment} sentiment.\n",
" You need to generate reviews for customers who have purchased one of the below menu items.\n",
" The customer name should be unique and should reflect a native name in the customer's country.\n",
" Read the description of each field for valid values.\n",
" Encourage unconventional ideas and fresh perspectives and inspires unique variations when creating the customer's name.\n",
" The review_datetime should be in the Google BigQuery format of: yyyy-mm-ddThh:mm:ss and be between 2020-01-01T00:00:00 and 2024-12-31T23:59:59.\n",
"\n",
" Here is the table schema:\n",
" <schema>\n",
" {table_schema}\n",
" </schema>\n",
"\n",
" Here are the menu item names and descriptions you can use for your reviews:\n",
" <existing_menu_items>\n",
" {existing_menu_items}\n",
" </existing_menu_items>\n",
" \"\"\"\n",
"\n",
" # Use LLM to generate data\n",
" customer_review_response = GeminiLLM(prompt, response_schema=response_schema)\n",
"\n",
" # Parse response (we know the JSON since we passed it to our LLM)\n",
" customer_review_response = json.loads(customer_review_response)\n",
" print(json.dumps(customer_review_response, indent=2))\n",
" review_datetime = customer_review_response[\"review_datetime\"].replace(\"'\",\"\\\\'\").replace(\"\\n\", \" \")\n",
" review_text = customer_review_response[\"review_text\"].replace(\"'\",\"\\\\'\").replace(\"\\n\", \" \")\n",
" social_media_channel = customer_review_response[\"social_media_channel\"].replace(\"'\",\"\\\\'\").replace(\"\\n\", \" \")\n",
"\n",
" # Optional, only needed if regenerating a menu item\n",
" sql = f\"DELETE FROM `{project_id}.{dataset_name}.{table_name}` WHERE customer_review_id = {customer_review_id}\"\n",
" #RunQuery(sql)\n",
"\n",
" # Insert to BigQuery\n",
" # Returning a known json schema and then generating an insert statement seems more reliable then having the LLM generating the SQL\n",
" sql = f\"\"\"INSERT INTO `{project_id}.{dataset_name}.{table_name}`\n",
" (customer_review_id, customer_id, review_datetime, review_text, review_sentiment, social_media_channel)\n",
" VALUES ({customer_review_id}, {customer_id}, '{review_datetime}', '{review_text}', '{review_sentiment}', '{social_media_channel}')\"\"\"\n",
"\n",
" RunQuery(sql)\n",
"\n",
" success = True\n",
" except Exception as error:\n",
" print(f\"An error occurred: {error}\")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"%%bigquery\n",
"\n",
"SELECT * \n",
" FROM `${bigquery_chocolate_ai_dataset}.customer_review`\n",
" ORDER BY customer_review_id DESC \n",
" LIMIT 10;"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "FIORnEg5kuNd"
},
"source": [
"### <font color='#4285f4'>Clean Up</font>"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "xZgYyEP9kuhj"
},
"outputs": [],
"source": [
"# Placeholder"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "BJHuX_CwkuzM"
},
"source": [
"### <font color='#4285f4'>Reference Links</font>"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "4eD2DhqttX_5"
},
"source": [
"- [Google.com](https://www.google.com)"
]
}
],
"metadata": {
"colab": {
"collapsed_sections": [
"W3Q8_NLjj3Xw",
"uSra7USNj3Xx",
"DszuLZoo9A7k",
"0mSjSjjylFnR",
"be6n7F9IkdbT",
"UCSP64xskgje",
"vbNakk0fkjTj",
"RtE_Q-H6lJQT",
"NjoOh66Pld1z",
"FIORnEg5kuNd",
"BJHuX_CwkuzM"
],
"name": "DB-GMA-Synthetic-Data-Generation-Customers-Reviews.ipynb",
"private_outputs": true,
"provenance": []
},
"kernelspec": {
"display_name": "Python 3",
"name": "python3"
},
"language_info": {
"name": "python"
}
},
"nbformat": 4,
"nbformat_minor": 0
}