notebooks/generate_synthetic_multimodal_data.ipynb (650 lines of code) (raw):

{ "cells": [ { "cell_type": "code", "source": [ "# Copyright 2025 Google LLC\n", "#\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# https://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ], "metadata": { "id": "6MNyzDPuBBW_" }, "id": "6MNyzDPuBBW_", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "# Synthetic Multimodal Data Generator\n", "\n", "This notebook shows how to synthesize data for multimodal analytics use cases, and is used to generate the data used in the CleanSight example application.\n", "\n", "This notebook is separate and distinct from the CleanSight application flow (parts 1-3)." ], "metadata": { "id": "iY4DxOJqbcl6" }, "id": "iY4DxOJqbcl6" }, { "cell_type": "markdown", "source": [ "## Load and Anonymize Bus Stops\n", "\n", "Load bus stops from the National Transit database, and anonymize their addresses." ], "metadata": { "id": "j4K6s18ubuWG" }, "id": "j4K6s18ubuWG" }, { "cell_type": "code", "source": [ "PROJECT_ID = \"<your project>\" # @param hide {type:\"string\"}\n", "LOCATION = \"us-central1\" # @param {type:\"string\"}\n", "\n", "BUCKET = 'bus-stops-open-access' # @param {type:\"string\"}\n", "\n", "BQ_DATASET = 'bus_d2ai'\n", "BQ_TABLE = 'staging_ntd_stops'\n", "\n", "STOP_FILE_URI = 'gs://bus-stops-open-access/loader-data/NTAD_National_Transit_Map_Stops_6633473857343365838.csv' # @param {type:\"string\"}" ], "metadata": { "id": "5jw1PmTQ4ReT" }, "id": "5jw1PmTQ4ReT", "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "from google.cloud import bigquery\n", "from google.cloud.bigquery import SchemaField\n", "\n", "# 1. load ntd stop data\n", "\n", "ntd_stops_schema = [\n", " SchemaField('OBJECTID', 'INT64'),\n", " SchemaField('ntd_id', 'STRING'),\n", " SchemaField('stop_id', 'STRING'),\n", " SchemaField('stop_name', 'STRING'),\n", " SchemaField('stop_desc', 'STRING'),\n", " SchemaField('stop_lat', 'FLOAT'),\n", " SchemaField('stop_lon', 'FLOAT'),\n", " SchemaField('zone_id', 'STRING'),\n", " SchemaField('stop_url', 'STRING'),\n", " SchemaField('stop_code', 'STRING'),\n", " SchemaField('location_type', 'STRING'),\n", " SchemaField('parent_station', 'STRING'),\n", " SchemaField('stop_timezone', 'STRING'),\n", " SchemaField('wheelchair_boarding', 'STRING'),\n", " SchemaField('level_id', 'STRING'),\n", " SchemaField('platform_code', 'STRING'),\n", " SchemaField('agency_id', 'STRING'),\n", " SchemaField('download_date', 'STRING'),\n", " SchemaField('x', 'FLOAT'),\n", " SchemaField('y', 'FLOAT')\n", "]\n", "\n", "try:\n", " print(f'creating bq client with {PROJECT_ID} {LOCATION}')\n", " bigquery_client = bigquery.Client(project=PROJECT_ID, location=LOCATION)\n", " dataset = bigquery.Dataset(f'{PROJECT_ID}.{BQ_DATASET}')\n", " dataset.location = LOCATION\n", "\n", " bigquery_client.create_dataset(dataset, timeout=30)\n", "\n", " dataset_ref = bigquery_client.dataset(BQ_DATASET)\n", " table_ref = dataset_ref.table(BQ_TABLE)\n", "\n", " job_config = bigquery.LoadJobConfig(\n", " write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,\n", " source_format=bigquery.SourceFormat.CSV,\n", " skip_leading_rows=1,\n", " schema=ntd_stops_schema,\n", " )\n", " load_job = bigquery_client.load_table_from_uri(\n", " STOP_FILE_URI, table_ref, job_config=job_config\n", " )\n", " load_job.result()\n", "\n", " print('created {}.{}'.format(BQ_DATASET, BQ_TABLE))\n", "\n", "except Exception as e:\n", " print('ntd_stop load failed {}'.format(e))\n", "\n", "# 2. select into bus stop data model\n", "\n", "from google.cloud import storage\n", "\n", "## 2a. anonymize bus stop addresses by replacing them with generated street\n", "## numbers and animal names as the street names\n", "ANIMALS_PATH = 'loader-data/animals.txt' # @param {type:\"string\"}\n", "address_suffixes = ['Circle', 'Square', 'Road', 'Lane', 'Street', 'Avenue', 'Way']\n", "\n", "storage_client = storage.Client()\n", "bucket = storage_client.bucket(BUCKET)\n", "blob = bucket.blob(ANIMALS_PATH)\n", "animals_text = blob.download_as_string().decode(\"utf-8\")\n", "\n", "animals = animals_text.splitlines()\n", "\n", "# this query generates bus stops from the NTD dataset and does the following:\n", "# - anonymizes street names using the animals list and random street numbers\n", "# - randomly decides values for school_zone, seating, and other boolean fields\n", "# - uses st_clusterdbscan() to generate bus_line_ids for bus_stops that are near\n", "# each other\n", "# - convert the stop_lon and stop_lat values into a GEOGRAPHY type using st_geogpoint()\n", "\n", "create_bus_stops_query = f\"\"\"\n", " declare animals array <string>;\n", " declare suffixes array <string>;\n", "\n", " set animals = {animals};\n", " set suffixes = {address_suffixes};\n", "\n", " create or replace table `{PROJECT_ID}.{BQ_DATASET}.bus_stops` as (\n", " select\n", " row_number() over() as bus_stop_id,\n", " *\n", " from (\n", " select\n", " cast((st_clusterdbscan(st_geogpoint(stop_lon, stop_lat), 200, 3) OVER()) as int64) as bus_line_id,\n", " mod(OBJECTID, 10) as stop_num,\n", " concat(\n", " cast(rand() * 10000 as int64),\n", " ' ', animals[cast(rand() * (array_length(animals) - 1) as int64)],\n", " ' ', suffixes[cast(rand() * (array_length(suffixes) - 1) as int64)]\n", " ) as street_address,\n", " (rand() > 0.9) as school_zone,\n", " (rand() > 0.6) as seating,\n", " -1 as num_benches,\n", " (rand() > 0.5) as maps,\n", " (rand() > 0.5) as shelter_ads,\n", " '' as panel_type,\n", " (rand() > 0.2) as lighting,\n", " st_geogpoint(stop_lon, stop_lat) as geom\n", " from `{PROJECT_ID}.{BQ_DATASET}.{BQ_TABLE}`)\n", " where bus_line_id is not null\n", " )\n", "\"\"\"\n", "\n", "# this will generate a new bus_stop record for each of the 65k+ bus stops in the NTD\n", "# dataset and store in a BQ table, minus the stops that were too far away from the\n", "# others to be part of a bus_line. resulting table will have about 50k rows\n", "try:\n", " client = bigquery.Client(project=PROJECT_ID, location=LOCATION)\n", " query_job = client.query(create_bus_stops_query)\n", " results = query_job.result()\n", "\n", "except Exception as e:\n", " print('bus_stop create failed {}'.format(e))\n", "\n" ], "metadata": { "id": "DovMHNxlb2yq" }, "id": "DovMHNxlb2yq", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Generate Bus Lines\n", "\n", "Bus \"lines\", or routes, aren't used much in our example application, but you can imagine that they would be very important in a real transit application because they would define the stops a particular bus visits, and in which order they are visited.\n", "\n", "This cell uses the values created by `st_clusterdbscan` above as the unique IDs for bus lines to be created." ], "metadata": { "id": "uI8FPLcScCbb" }, "id": "uI8FPLcScCbb" }, { "cell_type": "code", "source": [ "# 3. generate a bus_line for each bus_stop cluster\n", "\n", "PLANTS_PATH = 'loader-data/plants.txt' # @param {type:\"string\"}\n", "line_suffixes = ['Route', 'Line', 'Express']\n", "\n", "storage_client = storage.Client()\n", "bucket = storage_client.bucket(BUCKET)\n", "blob = bucket.blob(PLANTS_PATH)\n", "\n", "# in the NTD dataset, many bus routes are named after the street or neighborhood\n", "# they are located in. We use plant names to anonymize these in the same way we\n", "# used animal names in the previous cell to anonymize bus stop streets.\n", "plants_text = blob.download_as_string().decode(\"utf-8\")\n", "plants = plants_text.splitlines()\n", "\n", "create_bus_lines_query = f\"\"\"\n", " declare plants array <string>;\n", " declare suffixes array <string>;\n", "\n", " set plants = {plants};\n", " set suffixes = {line_suffixes};\n", "\n", " create or replace table `{PROJECT_ID}.{BQ_DATASET}.bus_lines` as (\n", " select\n", " bus_line_id,\n", " concat(\n", " plants[cast(rand() * (array_length(plants) - 1) as int64)],\n", " ' ', suffixes[cast(rand() * (array_length(suffixes) - 1) as int64)]\n", " ) as name,\n", " min(bus_stop_id) as start_bus_stop,\n", " max(bus_stop_id) as end_bus_stop,\n", " count(distinct bus_stop_id) as num_stops\n", " from `{PROJECT_ID}.{BQ_DATASET}.bus_stops`\n", "\n", " group by bus_line_id\n", " )\n", " \"\"\"\n", "try:\n", " client = bigquery.Client()\n", " query_job = client.query(create_bus_lines_query)\n", " results = query_job.result()\n", "\n", "except Exception as e:\n", " print('bus_line create failed {}'.format(e))\n", "\n" ], "metadata": { "id": "CqZErlb5cCua" }, "id": "CqZErlb5cCua", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Generate Bus Stop Images\n", "\n", "From a small intial set of real photos, you can use Imagen to create variants of those photos based on the prompts you provide. For example, you can instruct Imagen to add snow or people to the bus stop in order to evaluate a wider variety of situations.\n", "\n", "`DEMO_MODE` is an optional flag that can be used to generate a smaller subset of data. Leave this unchecked." ], "metadata": { "id": "i3myflHccaW8" }, "id": "i3myflHccaW8" }, { "cell_type": "code", "source": [ "SOURCE_FOLDER = 'source-images'\n", "EDITED_FOLDER = 'edited-images'\n", "DEMO_MODE = False # @param {type:\"boolean\"}\n", "DEMO_RANGE = 3 # used to determine % of prompts and number of images to process\n", "DEFAULT_RANGE = 100 # used to determine % of prompts" ], "metadata": { "id": "QVw4tDKVcz7q" }, "id": "QVw4tDKVcz7q", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "### Object tables\n", "\n", "This process also makes use of object tables; you can either create a new connection, or re-use the existing connection that you already have from the CleanSight application." ], "metadata": { "id": "7PbJAjeSc_hj" }, "id": "7PbJAjeSc_hj" }, { "cell_type": "code", "source": [ "!bq mk --connection --location=$LOCATION \\\n", " --connection_type=CLOUD_RESOURCE gcs_stop_images_cxn" ], "metadata": { "id": "-vDagGzKc0Ay" }, "id": "-vDagGzKc0Ay", "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "from google.cloud import bigquery\n", "\n", "BQ_SOURCE_TABLE = 'source_stop_images_ot'\n", "BQ_EDITED_TABLE = 'edited_stop_images_ot'\n", "\n", "source_object_table_sql = f\"\"\"\n", " create or replace external table `{PROJECT_ID}.{BQ_DATASET}.{BQ_SOURCE_TABLE}`\n", " with connection `{PROJECT_ID}.{LOCATION}.gcs_stop_images_cxn`\n", " options (\n", " object_metadata = 'SIMPLE',\n", " uris = ['gs://{BUCKET}/{SOURCE_FOLDER}/*']\n", " )\n", "\"\"\"\n", "\n", "edited_object_table_sql = f\"\"\"\n", " create or replace external table `{PROJECT_ID}.{BQ_DATASET}.{BQ_EDITED_TABLE}`\n", " with connection `{PROJECT_ID}.{LOCATION}.gcs_stop_images_cxn`\n", " options (\n", " object_metadata = 'SIMPLE',\n", " uris = ['gs://{BUCKET}/{EDITED_FOLDER}/*']\n", " )\n", "\"\"\"\n", "\n", "try:\n", " client = bigquery.Client()\n", " query_job = client.query(source_object_table_sql)\n", " query_job.result()\n", "\n", " query_job = client.query(edited_object_table_sql)\n", " query_job.result()\n", "\n", "except Exception as e:\n", " print('object table creation failed {}'.format(e))\n", "\n", "# get list of objects\n", "client = bigquery.Client()\n", "\n", "ot_sql = f'select * from `{PROJECT_ID}.{BQ_DATASET}.{BQ_SOURCE_TABLE}`'\n", "\n", "if DEMO_MODE:\n", " ot_sql += f' LIMIT {DEMO_RANGE}'\n", "\n", "query_job = client.query(ot_sql)\n", "rows = query_job.result()\n", "\n", "source_images = list(rows)\n", "if DEMO_MODE:\n", " source_images = source_images[0:DEMO_RANGE]\n", "\n", "print(len(source_images))" ], "metadata": { "id": "hWKaNL2FdK4z" }, "id": "hWKaNL2FdK4z", "execution_count": null, "outputs": [] }, { "cell_type": "code", "source": [ "%%bigquery\n", "\n", "LOAD DATA OVERWRITE `bus_d2ai.bus_stop_image_mappings`\n", "FROM FILES (\n", " format = 'JSON',\n", " uris = ['gs://bus-stops-open-access/loader-data/bus_stop_image_mappings.json']);\n", "\n", "create or replace table bus_d2ai.image_gen_prompts (\n", " prompt_text string,\n", " prompt_type string\n", ");\n", "\n", "insert into bus_d2ai.image_gen_prompts (prompt_text, prompt_type) values\n", " ('add a low hanging power line in the bus stop area, making it difficult to get on and off the bus', 'Safety'),\n", " ('add a water leak in the bus stop area', 'Safety'),\n", " ('add construction zone and 1 construction vehicle in the bus stop area', 'Safety'),\n", " ('add a thick layer of snow covering the ground and partially covering the bus area', 'Safety'),\n", " ('add a small pile of garbage in the bus stop area', 'Cleanliness'),\n", " ('add graffiti on the side of the bus stop', 'Cleanliness'),\n", " ('add sleeping bag on bus bench', 'Cleanliness'),\n", " ('add 1 bottle of water, 1 bottle of juice, 1 can of beer to bus stop', 'Cleanliness'),\n", " ('add 1 person waiting for bus, either sitting or standing', 'People'),\n", " ('add 2 people waiting for bus, either sitting or standing', 'People'),\n", " ('add 3 people waiting for bus, one person sitting and the other two standing', 'People'),\n", " ('add 1-2 joggers passing by', 'People'),\n", " ('add a service dog with his owner', 'People'),\n", " ('add a construction worker', 'People'),\n", " ('add a billboard at the bus stop, advertising game day tickets', 'Advertisement'),\n", " ('add a billboard at the bus stop, advertising continuing education for adults', 'Advertisement'),\n", " ('add a billboard at the bus stop, advertising Burger King', 'Advertisement'),\n", " ('add a billboard at the bus stop, advertising Insomnia Cookies', 'Advertisement'),\n", " ('add a small collection of 5 small pieces of garbage scattered over a wide area', 'Augmentation'),\n", " ('add a small pile of garbage on the street', 'Augmentation'),\n", " ('add small shopping cart near the bus stop filled with junk', 'Augmentation'),\n", " ('add street cones in the bus stop area', 'Augmentation'),\n", " ('add trash can near bus stop area', 'Augmentation'),\n", " ('add a newspaper box next to the bus stop', 'Augmentation'),\n", " ('add 2-5 ebikes next to the bus stop', 'Augmentation'),\n", " ('add a yard sign with a public notice on it', 'Augmentation'),\n", " ('add a yard sign with a freeze warning', 'Augmentation'),\n", " ('change time of day to late evening', 'Augmentation'),\n", " ('change time of day to early morning', 'Augmentation'),\n", " ('change time of day to noon', 'Augmentation'),\n", " ('add a thick layer of snow covering the ground', 'Augmentation'),\n", " ('add thick layer of fog over bus stop area, reducing visibility', 'Augmentation');" ], "metadata": { "id": "f8I4_O40dkpu" }, "id": "f8I4_O40dkpu", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Generate a date range\n", "\n", "Photos are taken at a particular time and place. We know the places because the bus_stop table includes locations (longitude, latitude). Here, we generate a range of times to attach to the image metadata." ], "metadata": { "id": "PwqXA-wx-Zue" }, "id": "PwqXA-wx-Zue" }, { "cell_type": "code", "source": [ "from datetime import date, timedelta\n", "import numpy as np\n", "\n", "start_date = date(2025, 1, 1)\n", "end_date = date(2025, 2, 15)\n", "print(\"date range is \" + str(start_date) + \" - \" + str(end_date))\n", "\n", "dates_between = end_date - start_date\n", "total_days = dates_between.days\n", "\n", "def gen_random_dates(num_dates):\n", " randays = np.random.choice(total_days, num_dates, replace=False)\n", " results = [start_date + timedelta(days=int(day)) for day in randays]\n", " return results" ], "metadata": { "id": "EhINIRVHdrrK" }, "id": "EhINIRVHdrrK", "execution_count": null, "outputs": [] }, { "cell_type": "markdown", "source": [ "## Use Imagen to generate photo variants\n", "\n", "**Please Note** as of March 2025 you need to fill out [this form](https://docs.google.com/forms/d/e/1FAIpQLScN9KOtbuwnEh6pV7xjxib5up5kG_uPqnBtJ8GcubZ6M3i5Cw/viewform) in order to get access to the Imagen model for editing in Vertex.\n", "\n", "The trick here is to use `SubjectReferenceImage` with `subject_description=\"bus stop\"` in order to prime the model to know what it's supposed to be looking at. Then you can prompt it to make the edit by calling `_generate_images`, providing a `reference_images`, and it will generate a new image based on the original \"subject\" image.\n", "\n", "Iterating over the collection of input photos cross-joined with the list of prompts specificed above (in the `image_gen_prompts` table) quickly produces a large amount of data that can be used for analysis!" ], "metadata": { "id": "fdwo8a_AdYn_" }, "id": "fdwo8a_AdYn_" }, { "cell_type": "code", "source": [ "import vertexai\n", "\n", "vertexai.init(project=PROJECT_ID, location=LOCATION)\n", "\n", "from vertexai import generative_models\n", "from vertexai.preview.vision_models import (\n", " ControlReferenceImage,\n", " Image,\n", " ImageGenerationModel,\n", " MaskReferenceImage,\n", " RawReferenceImage,\n", " SubjectReferenceImage\n", ")\n", "\n", "edit_model = ImageGenerationModel.from_pretrained('imagen-3.0-capability-001')\n", "\n", "from google.cloud import storage\n", "from google.cloud import bigquery\n", "import uuid\n", "\n", "storage_client = storage.Client()\n", "bucket = storage_client.bucket(BUCKET)\n", "\n", "bq_client = bigquery.Client(location=LOCATION)\n", "\n", "edited_images = []\n", "\n", "for image_row in source_images:\n", " gcs_uri = image_row.uri\n", " image_name = gcs_uri.split('/')[4].split('.')[0]\n", " print('processing', image_name)\n", "\n", " # look up the bus stop id\n", " bus_stop_id = None\n", " sql = f\"select bus_stop_id from bus_d2ai.bus_stop_image_mappings where image_name = '{image_name}'\"\n", " rows = bq_client.query_and_wait(sql)\n", " for row in rows:\n", " bus_stop_id = row[0]\n", "\n", " if bus_stop_id == None:\n", " sql = f\"select bus_stop_id from bus_d2ai.bus_stop_image_mappings where image_name is null order by bus_stop_id limit 1\"\n", " rows = bq_client.query_and_wait(sql)\n", " for row in rows:\n", " bus_stop_id = row[0]\n", "\n", " if bus_stop_id == None:\n", " print('Error finding available bus stop id')\n", " quit()\n", "\n", " sql = f\"update bus_d2ai.bus_stop_image_mappings set image_name = '{image_name}' where bus_stop_id = {bus_stop_id}\"\n", " bq_client.query_and_wait(sql)\n", "\n", " print(f'bus_stop_id {bus_stop_id} assigned to {image_name}')\n", "\n", " # retrieve some number of prompts by sampling the image_gen_prompts table\n", " image_prompts = []\n", " if DEMO_MODE:\n", " sql = f\"select prompt_text from bus_d2ai.image_gen_prompts tablesample system ({DEMO_RANGE} percent)\"\n", " else:\n", " sql = f\"select prompt_text from bus_d2ai.image_gen_prompts tablesample system ({DEFAULT_RANGE} percent)\"\n", "\n", " rows = client.query_and_wait(sql)\n", "\n", " for row in rows:\n", " image_prompts.append(row[0])\n", " print(f'retrieved {len(image_prompts)} image_prompts:', image_prompts)\n", "\n", " # generate random event dates within a date range\n", " # want the number of event dates to equal the number of prompts\n", " num_dates = len(image_prompts)\n", " event_dates = gen_random_dates(num_dates)\n", " print(f'generated {len(event_dates)} event_dates:', event_dates)\n", "\n", " for i, prompt in enumerate(image_prompts):\n", "\n", " event_date = str(event_dates[i])\n", "\n", " try:\n", " ref_image = Image(gcs_uri = gcs_uri)\n", " raw = RawReferenceImage(image=ref_image, reference_id=1)\n", "\n", " subject = SubjectReferenceImage(image=ref_image, reference_id=1, subject_type='default', subject_description='bus stop')\n", "\n", " print(f'generating variants for source image {gcs_uri}...')\n", " edited_image_response = edit_model._generate_images(\n", " prompt=prompt,\n", " reference_images=[subject],\n", " number_of_images=1,\n", " safety_filter_level='block_few',\n", " person_generation='allow_adult',\n", " aspect_ratio='4:3'\n", " )\n", "\n", " edited_image_metadata = {\n", " 'source_image_uri': gcs_uri,\n", " 'image_gen_prompt': prompt,\n", " 'bus_stop_id': bus_stop_id,\n", " 'event_date': event_date\n", " }\n", " for edited_image in edited_image_response:\n", " edited_image_id = str(bus_stop_id) + '-' + ''.join(str(uuid.uuid4()).split('-')[0:3])\n", " edited_image_name = f'{edited_image_id}.jpg'\n", " edited_image_metadata.update({ 'image_id': edited_image_id })\n", "\n", " blob = bucket.blob(f'{EDITED_FOLDER}/{event_date}/{edited_image_name}')\n", " blob.metadata = edited_image_metadata\n", " blob.upload_from_string(edited_image._image_bytes, \"image/jpg\")\n", " print(f'Uploaded edited image {edited_image_name} generated from source image {gcs_uri}')\n", "\n", " except Exception as e:\n", " print('image generation or upload failed; skipping {}'.format(e))\n", "\n" ], "metadata": { "id": "ceUict04dZCK" }, "id": "ceUict04dZCK", "execution_count": null, "outputs": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.10" }, "colab": { "provenance": [], "name": "generate_synthetic_multimodal_data.ipynb", "private_outputs": true } }, "nbformat": 4, "nbformat_minor": 5 }