sql-scripts/rideshare_lakehouse_enriched/sp_unstructured_data_analysis.sql (141 lines of code) (raw):
/*##################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################*/
/*
Use Cases:
- Perform AI/ML on unstructed data stored in a data lake
- Be able to query your data lake files with SQL
- Filter files by last updated and/or metadata
- Apply row level secuirty to the files that users can view
Description:
- Customers have images, PDF and other types of data in their lakes.
- Customers want to gain insights to this data which is typically operated on at a file level
- Users want an easy way to navigate and find items on data lakes
Show:
- The scoring of each item identified in each image.
References:
- https://cloud.google.com/bigquery/docs/object-table-introduction
- https://cloud.google.com/vision/docs/drag-and-drop
Clean up / Reset script:
DROP FUNCTION IF EXISTS `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_localize_objects`;
DROP FUNCTION IF EXISTS `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_labels`;
DROP FUNCTION IF EXISTS `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_landmarks`;
DROP FUNCTION IF EXISTS `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_logos`;
DROP TABLE IF EXISTS `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score`;
*/
DECLARE loopCounter INT64 DEFAULT 0;
DECLARE rowCount INT64 DEFAULT 0;
-- Show our objects in GCS / Data Lake
-- Metadata values are recorded as to where the image was taken
SELECT * FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_images`;
-- Create the Function Link between BQ and the Cloud Function
CREATE OR REPLACE FUNCTION `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_localize_objects` (uri STRING) RETURNS JSON
REMOTE WITH CONNECTION `${project_id}.${bigquery_region}.cloud-function`
OPTIONS
(endpoint = 'https://${cloud_function_region}-${project_id}.cloudfunctions.net/bigquery_external_function',
user_defined_context = [("mode","localize_objects_uri")]
);
CREATE OR REPLACE FUNCTION `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_labels` (uri STRING) RETURNS JSON
REMOTE WITH CONNECTION `${project_id}.${bigquery_region}.cloud-function`
OPTIONS
(endpoint = 'https://${cloud_function_region}-${project_id}.cloudfunctions.net/bigquery_external_function',
user_defined_context = [("mode","detect_labels_uri")],
max_batching_rows = 1
);
CREATE OR REPLACE FUNCTION `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_landmarks` (uri STRING) RETURNS JSON
REMOTE WITH CONNECTION `${project_id}.${bigquery_region}.cloud-function`
OPTIONS
(endpoint = 'https://${cloud_function_region}-${project_id}.cloudfunctions.net/bigquery_external_function',
user_defined_context = [("mode","detect_landmarks_uri")],
max_batching_rows = 1
);
CREATE OR REPLACE FUNCTION `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_logos` (uri STRING) RETURNS JSON
REMOTE WITH CONNECTION `${project_id}.${bigquery_region}.cloud-function`
OPTIONS
(endpoint = 'https://${cloud_function_region}-${project_id}.cloudfunctions.net/bigquery_external_function',
user_defined_context = [("mode","detect_logos_uri")],
max_batching_rows = 1
);
-- Table to hold our ML scoring
CREATE OR REPLACE TABLE `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score`
(
uri STRING,
location_id INT,
image_date DATE,
vision_ai_localize_objects JSON,
vision_ai_detect_labels JSON,
vision_ai_detect_landmarks JSON,
vision_ai_detect_logos JSON,
updated TIMESTAMP
)
CLUSTER BY uri;
/* Testing with sample images
SELECT `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_localize_objects`('gs://cloud-samples-data/vision/object_localization/duck_and_truck.jpg')
SELECT `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_labels`('gs://cloud-samples-data/vision/object_localization/duck_and_truck.jpg')
SELECT `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_landmarks`('gs://cloud-samples-data/vision/object_localization/duck_and_truck.jpg')
SELECT `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_logos`('gs://cloud-samples-data/vision/object_localization/duck_and_truck.jpg')
*/
--!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-- ALERT: You need to make sure there is data in the table
-- Object tables can take a few minutes to sync
-- During the deployment the Airflow job waits for data in this table, before calling this procedure
--!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
-- Score all the new items (that have not been scored) in batches of 10
-- DECLARE loopCounter INT64 DEFAULT 0; DECLARE rowCount INT64 DEFAULT 0;
SET rowCount = (SELECT COUNT(1)
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_images`
WHERE content_type = 'image/jpeg');
LOOP
INSERT INTO `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score`
(uri, location_id, image_date, vision_ai_localize_objects, vision_ai_detect_labels, vision_ai_detect_landmarks, vision_ai_detect_logos, updated)
WITH Data AS
(
SELECT uri,
metadata,
updated,
ROW_NUMBER() OVER (ORDER BY updated) AS RowNumber
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_images` AS ObjectTable
WHERE content_type = 'image/jpeg'
AND NOT EXISTS (SELECT 1
FROM `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score` AS MLScoreTable
WHERE ObjectTable.uri = MLScoreTable.uri
AND ObjectTable.updated = MLScoreTable.updated)
)
SELECT uri,
CASE WHEN ARRAY_LENGTH(metadata) = 0 -- no meta data
THEN NULL
WHEN ARRAY_LENGTH(metadata) >= 1 AND metadata[OFFSET(0)].name = 'location_id'
THEN CAST(metadata[OFFSET(0)].value AS INT)
WHEN ARRAY_LENGTH(metadata) = 2 AND metadata[OFFSET(1)].name = 'location_id'
THEN CAST(metadata[OFFSET(1)].value AS INT)
ELSE NULL
END AS location_id,
CASE WHEN ARRAY_LENGTH(metadata) = 0 -- no meta data
THEN NULL
WHEN ARRAY_LENGTH(metadata) >= 1 AND metadata[OFFSET(0)].name = 'image_date'
THEN CAST(metadata[OFFSET(0)].value AS DATE)
WHEN ARRAY_LENGTH(metadata) = 2 AND metadata[OFFSET(1)].name = 'image_date'
THEN CAST(metadata[OFFSET(1)].value AS DATE)
ELSE null
END AS image_date,
`${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_localize_objects`(uri),
`${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_labels`(uri),
`${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_landmarks`(uri),
`${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.ext_udf_ai_detect_logos`(uri),
updated
FROM Data
WHERE RowNumber BETWEEN 1 AND 10;
SET loopCounter = loopCounter + 10;
IF (SELECT COUNT(1)
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_images` AS ObjectTable
WHERE content_type = 'image/jpeg'
AND NOT EXISTS (SELECT 1
FROM `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score` AS MLScoreTable
WHERE ObjectTable.uri = MLScoreTable.uri
AND ObjectTable.updated = MLScoreTable.updated) ) = 0 OR loopCounter > rowCount THEN
LEAVE;
END IF;
END LOOP;
-- See results
SELECT *
FROM `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score`;
WITH ScoreAI AS
(
SELECT uri,
location_id,
image_date,
vision_ai_localize_objects,
vision_ai_detect_labels,
vision_ai_detect_landmarks,
vision_ai_detect_logos
FROM `${project_id}.${bigquery_rideshare_lakehouse_enriched_dataset}.bigquery_rideshare_images_ml_score`
)
SELECT uri,
'Object' AS detection_type,
location_id,
image_date,
REPLACE(JSON_VALUE(item.name),'"','') AS name,
item.score
FROM ScoreAI, UNNEST(JSON_QUERY_ARRAY(ScoreAI.vision_ai_localize_objects.localized_object_annotations)) AS item
UNION ALL
SELECT uri,
'Label' AS detection_type,
location_id,
image_date,
REPLACE(JSON_VALUE(item.description),'"','') AS name,
item.score
FROM ScoreAI, UNNEST(JSON_QUERY_ARRAY(ScoreAI.vision_ai_detect_labels.label_annotations)) AS item
UNION ALL
SELECT uri,
'Landmark' AS detection_type,
location_id,
image_date,
REPLACE(JSON_VALUE(item.description),'"','') AS name,
item.score
FROM ScoreAI, UNNEST(JSON_QUERY_ARRAY(ScoreAI.vision_ai_detect_landmarks.landmark_annotations)) AS item
UNION ALL
SELECT uri,
'Logo' AS detection_type,
location_id,
image_date,
REPLACE(JSON_VALUE(item.description),'"','') AS name,
item.score
FROM ScoreAI, UNNEST(JSON_QUERY_ARRAY(ScoreAI.vision_ai_detect_logos.logo_annotations)) AS item
ORDER BY 1;