sql-scripts/taxi_dataset/sp_demo_technical_overview.sql (153 lines of code) (raw):
/*##################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################*/
/*
YouTube:
- https://youtu.be/bMS4p2XHMpE
Use Cases:
- General overview of BigQuery recent features
Running this demo:
- Start the Spanner Airflow DAG: sample-bigquery-start-spanner (takes about 10 minutes to deploy and shuts down after 4 hours)
- Start the Datastream Airflow DAG: sample-datastream-private-ip-deploy (takes about 10 minutes to deploy)
- Start the Change Data Capture (generate data) DAG: sample-datastream-private-ip-generate-data (starts immedately)
- Start the Dataflow Airflow DAG: sample-dataflow-start-streaming-job (takes about 10 minutes to deploy and shuts down after 4 hours)
- Upload the notebook from the code bucket to colab enterprise
- You should run the notebook: BigQuery-Create-TensorFlow-Model.ipynb
- Run the Iceberg table DAG: sample-iceberg-create-tables-update-data (takes about 15 minutes to deploy)
- Start the Data Quality DAG: sample-rideshare-run-data-quality (takes about 10 minutes to complete)
- Log into https://atomfashion.io/analytics/salesoverview and switch to Premium Account
- Run the below "DO THIS IN ADVANCE"
- Run a manual Data Profile of table: ${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip
- https://console.cloud.google.com/dataplex/govern/profile?project=${project_id}
Clean up / Reset script:
-- This table was put in the taxi dataset since we need to gran pub/sub permissions to access
DROP TABLE IF EXISTS `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub`;
DROP SCHEMA IF EXISTS `${project_id}.technical_demo`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.load_data_taxi_trips_csv`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.biglake_taxi_trips_parquet`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.biglake_green_trips_parquet`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.omni_aws_taxi_rate_code`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.bigquery_rideshare_zone`;
DROP TABLE IF EXISTS `${project_id}.technical_demo.bigquery_time_travel`;
DROP TABLE IF EXISTS `${project_id}.${bigquery_taxi_dataset}.bigsearch_log_5b_5t_json_hourly_NOT_INDEXED`;
DROP TABLE IF EXISTS `${project_id}.${bigquery_taxi_dataset}.bigsearch_log_5b_5t_json_hourly_INDEXED`;
DROP MODEL IF EXISTS `${project_id}.technical_demo.model_churn`;
DROP MODEL IF EXISTS `${project_id}.technical_demo.model_tf_linear_regression_fare`;
DROP EXTERNAL TABLE IF EXISTS `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
DROP EXTERNAL TABLE IF EXISTS `${project_id}.technical_demo.biglake_rideshare_trip_iceberg`;
DROP SEARCH INDEX IF EXISTS idx_all_bigsearch_log_5b_5t_json_hourly ON `${project_id}.${bigquery_taxi_dataset}.bigsearch_log_5b_5t_json_hourly_INDEXED`;
*/
-- ***************************************************************************************************
-- BEGIN: DO THIS IN ADVANCE (some items need some time to fully initilize)
-- ***************************************************************************************************
--- Create temporary dataset
CREATE SCHEMA `${project_id}.technical_demo`
OPTIONS (
location = "${bigquery_region}"
);
-- Copy the data from the shared project
CREATE TABLE `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_NOT_INDEXED`
COPY `${shared_demo_project_id}.data_analytics_shared_data.bigsearch_log_5b_5t_json_hourly`;
CREATE TABLE `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_INDEXED`
COPY `${shared_demo_project_id}.data_analytics_shared_data.bigsearch_log_5b_5t_json_hourly`;
-- This can take some time to create the index
CREATE SEARCH INDEX idx_all_bigsearch_log_5b_5t_json_hourly
ON `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_INDEXED` (ALL COLUMNS)
OPTIONS (analyzer = 'LOG_ANALYZER');
-- It can take data lineage a few minutes to be created
CREATE OR REPLACE VIEW `${project_id}.technical_demo.taxi_lineage` AS
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_json` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id
UNION ALL
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_parquet` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id ;
-- ***************************************************************************************************
-- END: DO THIS IN ADVANCE
-- ***************************************************************************************************
--------------------------------------------------------------------------------------------------------
-- PART 1: See all the data - Any cloud, any speed
--------------------------------------------------------------------------------------------------------
/*
- Load data w/variety of file formats
- Create external tables over different file formats
- Parquet
- Parquet w/Hive partitioning
- Iceberg
- Query and Transfer AWS data
- Dataflow Streaming Data
- BQ Subscriptions
- DataStream
- Spanner
*/
-- * Load data w/varity of file formats *
-- Load data into BigQuery (AVRO, ORC, CSV, JSON, Parquet, Iceberg)
LOAD DATA OVERWRITE `${project_id}.technical_demo.load_data_taxi_trips_csv`
FROM FILES (
format = 'CSV',
skip_leading_rows = 1,
field_delimiter=',',
null_marker='',
uris = ['gs://${processed_bucket_name}/processed/taxi-data/green/trips_table/csv/year=2021/month=12/*.csv']);
SELECT * FROM `${project_id}.technical_demo.load_data_taxi_trips_csv`;
-- * Create external tables over different file formats *
-- Query External Parquet data
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_taxi_trips_parquet`
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
format = "PARQUET",
uris = ['gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_trip/parquet/*.parquet']
);
SELECT * FROM `${project_id}.technical_demo.biglake_taxi_trips_parquet` LIMIT 1000;
-- Query External Parquet data w/Hive Partitioning
-- https://console.cloud.google.com/storage/browser/${processed_bucket_name};tab=objects?forceOnBucketsSortingFiltering=false&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_green_trips_parquet`
WITH PARTITION COLUMNS (
year INTEGER,
month INTEGER
)
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
format = "PARQUET",
hive_partition_uri_prefix = "gs://${processed_bucket_name}/processed/taxi-data/green/trips_table/parquet/",
uris = ['gs://${processed_bucket_name}/processed/taxi-data/green/trips_table/parquet/*.parquet']
);
SELECT * FROM `${project_id}.technical_demo.biglake_green_trips_parquet` LIMIT 1000;
-- Query External Iceberg data
-- OPEN: https://console.cloud.google.com/storage/browser/rideshare-lakehouse-enriched-${random_extension}/rideshare_iceberg_catalog/rideshare_lakehouse_enriched.db/biglake_rideshare_payment_type_iceberg/metadata?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_rideshare_trip_iceberg`
OPTIONS (
format = 'ICEBERG',
uris = ["gs://${gcs_rideshare_lakehouse_enriched_bucket}/rideshare_iceberg_catalog/rideshare_lakehouse_enriched.db/biglake_rideshare_payment_type_iceberg/metadata/REPLACE-ME.metadata.json"]
);
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_trip_iceberg`;
-- * Query and Transfer AWS data *
-- https://cloud.google.com/bigquery/docs/load-data-using-cross-cloud-transfer
-- Create the AWS tables
CALL `${project_id}.${aws_omni_biglake_dataset_name}.sp_demo_aws_omni_create_tables`();
-- Load Data from AWS (or Azure) - Query data in a remote cloud and create a local BigQuery table
CREATE OR REPLACE TABLE `${project_id}.technical_demo.omni_aws_taxi_rate_code` AS
SELECT * FROM `${project_id}.${aws_omni_biglake_dataset_name}.taxi_s3_rate_code`;
SELECT * FROM `${project_id}.technical_demo.omni_aws_taxi_rate_code`;
-- * Dataflow Streaming Data *
-- Ingest streaming data (Open Dataflow, we can use custom code and/or templates)
-- https://console.cloud.google.com/dataflow/jobs?project=${project_id}
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` LIMIT 1000;
SELECT COUNT(*) AS RecordCount
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming`
WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 MINUTE);
-- * BQ Subscriptions *
CREATE OR REPLACE TABLE `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub`
(
data STRING
)
PARTITION BY TIMESTAMP_TRUNC(_PARTITIONTIME, HOUR);
-- Open: https://console.cloud.google.com/cloudpubsub/subscription/list?project=${project_id}
-- projects/pubsub-public-data/topics/taxirides-realtime
-- grant service account: service-31541164526@gcp-sa-pubsub.iam.gserviceaccount.com
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub` LIMIT 1000;
SELECT COUNT(*) AS Cnt FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_pub_sub`;
-- REMEMBER TO DELETE YOUR Pub/Sub TO BIGQUERY JOB
-- * Datastream *
-- https://console.cloud.google.com/datastream/streams?project=${project_id}
SELECT * FROM `${project_id}.datastream_private_ip_public.driver`;
SELECT * FROM `${project_id}.datastream_private_ip_public.review`;
SELECT * FROM `${project_id}.datastream_private_ip_public.payment`;
-- * Spanner Federated Query *
-- https://console.cloud.google.com/spanner/instances/spanner-ral9vzfqhi/databases/weather/details/tables?project=${project_id}
-- Spanner (data)
EXECUTE IMMEDIATE """
SELECT *
FROM EXTERNAL_QUERY(
'projects/${project_id}/locations/${spanner_region}/connections/bq_spanner_connection',
"SELECT * FROM weather WHERE station_id='USW00094728'");
""";
--------------------------------------------------------------------------------------------------------
-- PART 2: Trust it all - Data lineage, dataplex, data quality
--------------------------------------------------------------------------------------------------------
/*
- Lineage via/Console and Views
- Managing with Dataplex
- Securing with Dataplex
- Registration with Dataplex
- Data Profiling metrics
- Data Quality metrics
- Security (RLS/CLS/DM to both internal/BigLake tables)
*/
-- * Lineage via/Console and Views *
-- Create a view and show the lineage
-- Show: ${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_model_training_data
-- Shows Analytics Hub data
-- https://console.cloud.google.com/dataplex/search?project=${project_id}&q=${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_model_training_data
-- Lineage from storage through tables and views
-- This was done in advance
/*
CREATE VIEW `${project_id}.technical_demo.taxi_lineage` AS
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_json` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id
UNION ALL
SELECT CAST(rideshare_trip_id AS STRING) AS rideshare_trip_id,
CAST(pickup_location_id AS INTEGER) AS pickup_location_id,
zone_pickup.borough AS pickup_borough,
CAST(pickup_datetime AS TIMESTAMP) AS pickup_datetime,
CAST(dropoff_location_id AS INTEGER) AS dropoff_location_id,
zone_dropoff.borough AS dropoff_borough,
CAST(dropoff_datetime AS TIMESTAMP) AS dropoff_datetime,
CAST(ride_distance AS FLOAT64) AS ride_distance,
CAST(is_airport AS BOOLEAN) AS is_airport,
payment.payment_type_description,
CAST(fare_amount AS FLOAT64) AS fare_amount,
CAST(tip_amount AS FLOAT64) AS tip_amount,
CAST(taxes_amount AS FLOAT64) AS taxes_amount,
CAST(total_amount AS FLOAT64) AS total_amount
FROM `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_trip_parquet` AS trip
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_payment_type_json` AS payment
ON CAST(trip.payment_type_id AS INT64) = payment.payment_type_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_pickup
ON CAST(trip.pickup_location_id AS INT64) = zone_pickup.location_id
INNER JOIN `${project_id}.${bigquery_rideshare_lakehouse_raw_dataset}.biglake_rideshare_zone_csv` zone_dropoff
ON CAST(trip.dropoff_location_id AS INT64) = zone_dropoff.location_id ;
*/
SELECT * FROM `${project_id}.technical_demo.taxi_lineage` LIMIT 100;
-- * Managing with Dataplex *
-- https://console.cloud.google.com/dataplex/lakes?project=${project_id}
-- Show Dataplex Lakes (Managed View)
-- * Security with Dataplex *
-- https://console.cloud.google.com/dataplex/secure?project=${project_id}
-- Show Dataplex Security (Securing Assests within BigQuery / Storage as one)
-- * Registration with Dataplex *
-- Show tables in storage
-- Show the tables within BigQuery
-- https://console.cloud.google.com/storage/browser/rideshare-lakehouse-raw-${random_extension};tab=objects?forceOnBucketsSortingFiltering=false&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
SELECT * FROM `${project_id}.rideshare_raw_zone_${random_extension}.rideshare_payment_type` LIMIT 1000;
SELECT * FROM `${project_id}.rideshare_raw_zone_${random_extension}.rideshare_trip_json` LIMIT 1000;
SELECT * FROM `${project_id}.rideshare_raw_zone_${random_extension}.rideshare_trip_parquet` LIMIT 1000;
-- * Data Profiling metrics *
-- Show data scan of table: bigquery_rideshare_trip
-- https://console.cloud.google.com/dataplex/govern/profile?project=${project_id}
-- * Data Quality metrics *
-- https://console.cloud.google.com/dataplex/govern/quality?project=${project_id}
-- See results in data catalog
-- https://console.cloud.google.com/dataplex/search?project=${project_id}&q=${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip
-- * Security (RLS/CLS/DM to both internal/BigLake tables) *
-- Create a BigLake table over the Zone data
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_rideshare_zone_csv`
(
location_id INTEGER,
borough STRING,
zone STRING,
service_zone STRING
)
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
format = "CSV",
field_delimiter = '|',
skip_leading_rows = 1,
uris = ['gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_zone/*.csv']
);
-- Show CSV table (all data)
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv` LIMIT 1000;
-- Query: Create an access policy so the admin (you) can only see Manhattan data
CREATE OR REPLACE ROW ACCESS POLICY rls_biglake_rideshare_zone_csv
ON `${project_id}.technical_demo.biglake_rideshare_zone_csv`
GRANT TO ("user:${gcp_account_name}")
FILTER USING (borough = 'Manhattan');
-- See just the data you are allowed to see
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
-- Edit the table and show Data Masking / Column level security on the service zone
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
DROP ALL ROW ACCESS POLICIES ON `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
-- Create an Internal table and apply a policy (same exact code)
CREATE OR REPLACE TABLE `${project_id}.technical_demo.bigquery_rideshare_zone` AS
SELECT * FROM `${project_id}.technical_demo.biglake_rideshare_zone_csv`;
CREATE OR REPLACE ROW ACCESS POLICY rls_biglake_rideshare_zone_csv
ON `${project_id}.technical_demo.bigquery_rideshare_zone`
GRANT TO ("user:${gcp_account_name}")
FILTER USING (service_zone = 'Yellow Zone');
SELECT * FROM `${project_id}.technical_demo.bigquery_rideshare_zone`;
DROP ALL ROW ACCESS POLICIES ON `${project_id}.technical_demo.bigquery_rideshare_zone`;
--------------------------------------------------------------------------------------------------------
-- PART 3: Activate it all - BQML, universal semantic layer, connect layer as dashboard, turn on bi engine (1 click), vertex (deploy 1 click and drift detection), etc
--------------------------------------------------------------------------------------------------------
/*
- BQML
- Create Model
- Explainable AI
- Integration with Vertex AI
- Model Registry
- REST API to expose to consuming apps
- Build model in Vertex (ML Ops and ingest into BQ - Import)
- Show time travel
- ML Ops pipeline (TO DO)
- JSON Analytics / data type
- BigSearch
- Query Acceleration
- Unstructured data analysis
- BigSpark
*/
-- * BQML - Create a Model * (takes 30 seconds)
-- Save the model in Vertex AI model registry
-- https://cloud.google.com/bigquery/docs/bqml-introduction#supported_models
-- https://cloud.google.com/bigquery/docs/reference/standard-sql/bigqueryml-syntax-create#create_model_syntax
CREATE OR REPLACE MODEL `${project_id}.technical_demo.model_churn`
OPTIONS(
MODEL_TYPE="LOGISTIC_REG", -- or BOOSTED_TREE_CLASSIFIER, DNN_CLASSIFIER, AUTOML_CLASSIFIER
INPUT_LABEL_COLS=["churned"],
MODEL_REGISTRY = "vertex_ai"
) AS
SELECT * EXCEPT(user_first_engagement, user_pseudo_id)
FROM `${project_id}.${bigquery_thelook_ecommerce_dataset}.training_data`;
-- * BQML - Explainable AI *
-- Use Explainable AI for the prediction
EXECUTE IMMEDIATE """
SELECT *
FROM ML.EXPLAIN_PREDICT(MODEL `${project_id}.technical_demo.model_churn`,
(SELECT * FROM `${project_id}`.thelook_ecommerce.training_data LIMIT 10));
""";
-- * Integration with Vertex AI - Model Registry *
-- REST API to expose to consuming apps
-- https://console.cloud.google.com/vertex-ai/models?project=${project_id}
-- * Build model in Vertex (ML Ops and ingest into BQ - Import) *
-- Import a model (Tensorflow)
-- Show notebook / model
CREATE OR REPLACE MODEL `${project_id}.technical_demo..model_tf_linear_regression_fare`
OPTIONS (MODEL_TYPE='TENSORFLOW',
MODEL_PATH='gs://${processed_bucket_name}/tensorflow/taxi_fare_model/linear_regression/*');
-- Run a prediction (note you can run the same prediction in your notebook and get the same result)
EXECUTE IMMEDIATE """
SELECT *
FROM ML.PREDICT(MODEL `${project_id}.technical_demo..model_tf_linear_regression_fare`,
(
SELECT [10.0,20.0] AS normalization_input
));
""";
-- * Show time travel w/this (delete then see time travel) - this is for easy recovery of data *
CREATE OR REPLACE TABLE `${project_id}.technical_demo.bigquery_time_travel`
(
Vendor_Id INTEGER,
Pickup_DateTime TIMESTAMP,
Dropoff_DateTime TIMESTAMP,
PULocationID INTEGER,
DOLocationID INTEGER,
Trip_Distance FLOAT64,
Total_Amount FLOAT64,
PartitionDate DATE
)
PARTITION BY PartitionDate
AS SELECT
Vendor_Id,
Pickup_DateTime,
Dropoff_DateTime,
PULocationID,
DOLocationID,
Trip_Distance,
Total_Amount,
DATE(year, month, 1) as PartitionDate
FROM `${project_id}.${bigquery_taxi_dataset}.ext_green_trips_parquet`
WHERE DATE(year, month, 1) = '2021-01-01';
-- Query: 76516
SELECT COUNT(*) AS RecordCount FROM `${project_id}.technical_demo.bigquery_time_travel`;
-- Make a delete mistake
DELETE FROM `${project_id}.technical_demo.bigquery_time_travel` WHERE PULocationID <= 47;
-- Query: 59114
SELECT COUNT(*) AS RecordCount FROM `${project_id}.technical_demo.bigquery_time_travel`;
-- Query: 76516 - See the prior data before the INSERT
-- NOTE: You might need to change INTERVAL 30 SECOND to more time if you spend time talking
SELECT COUNT(*) AS RecordCount
FROM `${project_id}.technical_demo.bigquery_time_travel`
FOR SYSTEM_TIME AS OF TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 15 SECOND);
-- * JSON Analytics / data type *
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json`
(
taxi_json JSON
)
WITH PARTITION COLUMNS (
-- column order must match the external path
year INTEGER,
month INTEGER
)
OPTIONS (
format = "CSV",
field_delimiter = '\u00fe',
skip_leading_rows = 0,
hive_partition_uri_prefix = "gs://${processed_bucket_name}/processed/taxi-data/yellow/trips_table/json/",
uris = ['gs://${processed_bucket_name}/processed/taxi-data/yellow/trips_table/json/*.json']
);
-- JSON Analytics with JSON data
SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json` LIMIT 100;
SELECT taxi_json.Vendor_Id,
taxi_json.Rate_Code_Id,
taxi_json.Fare_Amount
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json`
LIMIT 100;
-- for each day find the highest passenger count and total amount per payment type
WITH TaxiData AS
(
SELECT CAST(JSON_VALUE(taxi_trips.taxi_json.Pickup_DateTime) AS TIMESTAMP) AS Pickup_DateTime,
SAFE.INT64(taxi_trips.taxi_json.Payment_Type_Id) AS Payment_Type_Id,
SAFE.INT64(taxi_trips.taxi_json.Passenger_Count) AS Passenger_Count,
SAFE.FLOAT64(taxi_trips.taxi_json.Total_Amount) AS Total_Amount,
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_json` AS taxi_trips
WHERE CAST(JSON_VALUE(taxi_trips.taxi_json.Pickup_DateTime) AS TIMESTAMP) BETWEEN '2020-01-01' AND '2020-06-01'
AND SAFE.INT64(taxi_trips.taxi_json.Payment_Type_Id) IN (1,2)
)
, TaxiDataRanking AS
(
SELECT CAST(Pickup_DateTime AS DATE) AS Pickup_Date,
taxi_trips.Payment_Type_Id,
taxi_trips.Passenger_Count,
taxi_trips.Total_Amount,
RANK() OVER (PARTITION BY CAST(Pickup_DateTime AS DATE),
taxi_trips.Payment_Type_Id
ORDER BY taxi_trips.Passenger_Count DESC,
taxi_trips.Total_Amount DESC) AS Ranking
FROM TaxiData AS taxi_trips
)
SELECT Pickup_Date,
Payment_Type_Description,
Passenger_Count,
Total_Amount
FROM TaxiDataRanking
INNER JOIN `${project_id}.${bigquery_taxi_dataset}.payment_type` AS payment_type
ON TaxiDataRanking.Payment_Type_Id = payment_type.Payment_Type_Id
WHERE Ranking = 1
ORDER BY Pickup_Date, Payment_Type_Description;
-- * BigSearch *
-- How many records are we searching
SELECT COUNT(*) AS Cnt FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip` ;
-- Find a credit card number this since this is generated data and change below REPLACE_CREDIT_CARD
SELECT credit_card_number
FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip`
WHERE credit_Card_number IS NOT NULL
LIMIT 1;
-- The table is not partitioned or clustered on this data
-- Click on Job Information and show "Index Usage Mode: FULLY_USED"
SELECT *
FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip`
WHERE SEARCH(credit_card_number,'REPLACE_CREDIT_CARD', analyzer=>'NO_OP_ANALYZER');
-- Search even if we do not know the field name
SELECT *
FROM `${project_id}.${bigquery_rideshare_lakehouse_curated_dataset}.bigquery_rideshare_trip` AS bigquery_rideshare_trip
WHERE SEARCH(bigquery_rideshare_trip,'REPLACE_CREDIT_CARD', analyzer=>'NO_OP_ANALYZER');
-- NOT_INDEXED
-- Bytes processed: 5.83 TB
-- Index Usage Mode: UNUSED
SELECT *
FROM `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_NOT_INDEXED`
WHERE SEARCH(insertId, '40ms4zeh78tg6o6jx');
-- Make sure index is complete: coverage_percentage should be 100%
SELECT table_name, index_name, coverage_percentage
FROM `${project_id}.technical_demo.INFORMATION_SCHEMA.SEARCH_INDEXES`
WHERE index_status = 'ACTIVE';
-- Search the field "insertId" for a value (just one column)
-- Bytes processed: 106.57 MB
-- Index Usage Mode: FULLY_USED
SELECT *
FROM `${project_id}.technical_demo.bigsearch_log_5b_5t_json_hourly_INDEXED`
WHERE SEARCH(insertId, '40ms4zeh78tg6o6jx');
-- 50 Billion Rows (requires access to the shared project)
SELECT COUNT(*) AS Cnt
FROM `${shared_demo_project_id}.bigquery_features.bigsearch_log_50b_60t_json_hourly` AS LogsIndexed
WHERE SEARCH((internalClusterId, labels), 'service281')
AND TIMESTAMP_TRUNC(timestamp, HOUR) BETWEEN TIMESTAMP("2021-02-01 00:00:00")
AND TIMESTAMP("2021-02-06 00:00:00") ;
-- * Query Acceleration *
/*
Data Size:
- File Count = 5,068,912
- Directory Count = 1,588,355
- The taxi trip table was exported to cloud storage and partitioned by Year, Month, Day, Hour and Minute
- About 75 GB of data was exported and created many small files
*/
-- DO NOT RUN - TOO SLOW (the point of the demo)
-- NO ACCELERATION
-- Duration: >3 min
SELECT *
FROM `${shared_demo_project_id}.bigquery_preview_features.biglake_no_acceleration`
WHERE year = 2021
AND month = 1
AND day = 1
AND hour = 0
AND minute = 0;
-- WITH ACCELERATION
SELECT *
FROM `${shared_demo_project_id}.bigquery_preview_features.biglake_query_acceleration`
WHERE year = 2021
AND month = 1
AND day = 1
AND hour = 0
AND minute = 0;
-- * Unstructured Data Analytis *
-- https://console.cloud.google.com/storage/browser/${gcs_rideshare_lakehouse_raw_bucket}/rideshare_images?pageState=(%22StorageObjectListTable%22:(%22f%22:%22%255B%255D%22))&project=${project_id}&prefix=&forceOnObjectsSortingFiltering=false
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.technical_demo.biglake_unstructured_data`
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS (
object_metadata="DIRECTORY",
uris = ['gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_images/*.jpg',
'gs://${gcs_rideshare_lakehouse_raw_bucket}/rideshare_images/*.jpeg'],
max_staleness=INTERVAL 30 MINUTE,
metadata_cache_mode="MANUAL");
CALL BQ.REFRESH_EXTERNAL_METADATA_CACHE('${project_id}.technical_demo.biglake_unstructured_data');
-- See the data
SELECT * FROM `${project_id}.technical_demo.biglake_unstructured_data` ;
-- Process the images via our object table and show the results
WITH UnstructuredData AS
(
-- get the image from the oject table
SELECT *
FROM `${project_id}.technical_demo.biglake_unstructured_data`
WHERE uri = (SELECT uri FROM `${project_id}.technical_demo.biglake_unstructured_data` LIMIT 1)
)
, ScoreAI AS
(
-- call a remote function
SELECT `${project_id}.rideshare_lakehouse_enriched.ext_udf_ai_localize_objects`(UnstructuredData.uri) AS json_result
FROM UnstructuredData
)
SELECT item.name,
item.score,
ScoreAI.json_result
FROM ScoreAI, UNNEST(JSON_QUERY_ARRAY(ScoreAI.json_result.localized_object_annotations)) AS item;
-- Show all the images processed (object detection, labels, landmarks, logos)
SELECT * FROM `${project_id}.rideshare_lakehouse_enriched.bigquery_rideshare_images_ml_score` LIMIT 100;
-- * BigSpark *
-- BigLake Metastore Service
-- CALL `${project_id}.rideshare_lakehouse_enriched.sp_process_data`();
--------------------------------------------------------------------------------------------------------
-- PART 4: Intelligent infrastructure (autoscaling to the minute, best price/perf, etc, capacity, )
--------------------------------------------------------------------------------------------------------
/*
- Autoscaling / Workload management
- Monitoring - Show these views (slide 20): WIP - Landing Launches 27 Mar2023 - DA Field Council - go/da-fc (TO DO)
- See how much you are using
- Quotas (even for on-demand)
- Query Visualization (optimizer/queues)
- Slot recommender / Info Schema (see pricing / slots)
- Data Modeling (Looker: https://atomfashion.io/analytics/salesoverview | Select Map | Add Age)
- SQL Translate
- Sheets
- BI Engine
*/
-- * Autoscaling / Workload management - 10 second autoscaling *
-- https://console.cloud.google.com/bigquery/admin/reservations;region=${bigquery_region}/create?project=${project_id}®ion=${bigquery_region}
-- * Monitoring *
-- https://console.cloud.google.com/bigquery/admin/monitoring/resource-utilization?project=${project_id}®ion=${bigquery_region}
-- * See how much you are using (Informational Schema) *
-- Compute the cost per Job, Average slots per Job and Max slots per Job (at the job stage level)
-- This will show you the cost for the Query and the Maximum number of slots used and if any additional slots where requested (to help gauge for reservations)
SELECT project_id,
job_id,
reservation_id,
EXTRACT(DATE FROM creation_time) AS creation_date,
TIMESTAMP_DIFF(end_time, creation_time, SECOND) AS job_duration_seconds,
job_type,
user_email,
total_bytes_billed,
-- 6.25 / 1,099,511,627,776 = 0.00000000000568434188608080 ($6.25 per TB so cost per byte is 0.00000000000568434188608080)
CASE WHEN reservation_id IS NULL
THEN CAST(total_bytes_billed AS BIGDECIMAL) * CAST(0.00000000000568434188608080 AS BIGDECIMAL)
ELSE 0
END
as est_cost,
-- Average slot utilization per job is calculated by dividing
-- total_slot_ms by the millisecond duration of the job
SAFE_DIVIDE(job.total_slot_ms,(TIMESTAMP_DIFF(job.end_time, job.start_time, MILLISECOND))) AS job_avg_slots,
query,
-- Determine the max number of slots used at ANY stage in the query. The average slots might be 55
-- but a single stage might spike to 2000 slots. This is important to know when estimating when purchasing slots.
MAX(SAFE_DIVIDE(unnest_job_stages.slot_ms,unnest_job_stages.end_ms - unnest_job_stages.start_ms)) AS jobstage_max_slots,
-- Is the job requesting more units of works (slots). If so you need more slots.
-- estimatedRunnableUnits = Units of work that can be scheduled immediately.
-- Providing additional slots for these units of work will accelerate the query, if no other query in the reservation needs additional slots.
MAX(unnest_timeline.estimated_runnable_units) AS estimated_runnable_units
FROM `region-${bigquery_region}`.INFORMATION_SCHEMA.JOBS AS job
CROSS JOIN UNNEST(job_stages) as unnest_job_stages
CROSS JOIN UNNEST(timeline) AS unnest_timeline
WHERE project_id = '${project_id}'
AND DATE(creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) AND CURRENT_DATE()
GROUP BY 1,2,3,4,5,6,7,8,9,10,11
ORDER BY creation_date DESC ;
-- * Quotas (even for on-demand) *
-- https://console.cloud.google.com/iam-admin/quotas?referrer=search&project=${project_id}&pageState=(%22allQuotasTable%22:(%22f%22:%22%255B%257B_22k_22_3A_22Metric_22_2C_22t_22_3A10_2C_22v_22_3A_22_5C_22bigquery.googleapis.com%252Fquota%252Fquery%252Fusage_5C_22_22_2C_22s_22_3Atrue_2C_22i_22_3A_22metricName_22%257D%255D%22))
-- * Query Visualization (optimizer/queues) *
SELECT FORMAT_DATE("%w", Pickup_DateTime) AS WeekdayNumber,
FORMAT_DATE("%A", Pickup_DateTime) AS WeekdayName,
vendor.Vendor_Description,
payment_type.Payment_Type_Description,
SUM(taxi_trips.Total_Amount) AS high_value_trips
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips` AS taxi_trips
INNER JOIN `${project_id}.${bigquery_taxi_dataset}.vendor` AS vendor
ON taxi_trips.Vendor_Id = vendor.Vendor_Id
AND taxi_trips.Pickup_DateTime BETWEEN '2020-01-01' AND '2020-06-01'
LEFT JOIN `${project_id}.${bigquery_taxi_dataset}.payment_type` AS payment_type
ON taxi_trips.Payment_Type_Id = payment_type.Payment_Type_Id
GROUP BY 1, 2, 3, 4
HAVING SUM(taxi_trips.Total_Amount) > 50
ORDER BY WeekdayNumber, 3, 4;
-- * Slot recommender *
-- https://console.cloud.google.com/bigquery/admin/reservations;region=${bigquery_region}/slot-estimator?project=${project_id}®ion=${bigquery_region}
-- * Data Modeling (Looker: https://atomfashion.io/analytics/salesoverview | Select Map | Add Age) *
-- * SQL Translate *
/*
CREATE VOLATILE TABLE exampleTable (age INT, gender VARCHAR(10));
INS INTO exampleTable (10, 'F');
INS INTO exampleTable (20, 'M');
SEL *
FROM exampleTable
WHERE gender EQ 'F'
AND age LT 15;
*/
-- * Sheets *
-- Create sheet w/Taxi Data
-- Create pivot table (values of tip/total amt)
-- * BI Engine *
-- Open BI Engine tab and add BI Engine
-- https://console.cloud.google.com/bigquery/admin/bi-engine?project=${project_id}
-- REMEMBER TO DELETE YOUR Pub/Sub TO BIGQUERY JOB
SELECT 1;