sql-scripts/taxi_dataset/sp_create_demo_dataform.sql (43 lines of code) (raw):
/*##################################################################################
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###################################################################################*/
/*
Use Cases:
- To run the datafrom demo, first run this sp to create the necessary objects
Description:
- Create new dataset
- Create table for pub_sub raw data
- Create BigLake table
Clean up / Reset script:
DROP SCHEMA IF EXISTS `${project_id}.dataform_demo` CASCADE;
*/
-- Drop everythingi
DROP SCHEMA IF EXISTS `${project_id}.dataform_demo` CASCADE;
--- Create dataform_demo dataset
CREATE SCHEMA `${project_id}.dataform_demo`
OPTIONS (
location = "${bigquery_region}"
);
-- Let Dataform access
GRANT `roles/bigquery.dataEditor`
ON SCHEMA `${project_id}.dataform_demo`
TO "serviceAccount:service-${project_number}@gcp-sa-dataform.iam.gserviceaccount.com";
--- Create table for pub_sub raw data
CREATE OR REPLACE TABLE `${project_id}.dataform_demo.taxi_trips_pub_sub`
(
data STRING
)
PARTITION BY TIMESTAMP_TRUNC(_PARTITIONTIME, HOUR);
-- For Dataform to call to create the BigLake table (yes, we are creating a stored procedure from within a stored procedure)
CREATE OR REPLACE PROCEDURE `${project_id}.dataform_demo.create_biglake_table`(name STRING, uris STRING)
BEGIN
EXECUTE IMMEDIATE FORMAT("""
CREATE OR REPLACE EXTERNAL TABLE `${project_id}.dataform_demo.%s`
WITH CONNECTION `${project_id}.${bigquery_region}.biglake-connection`
OPTIONS(
uris=[%s],
format="PARQUET"
)
""",name,uris);
END;
-- Call the stored procedure just created
CALL `${project_id}.dataform_demo.create_biglake_table` ('biglake_payment_type',"'gs://${bucket_name}/processed/taxi-data/payment_type_table/*.parquet'");
-- This replaces the Pub/Sub topic since we already have loaded the same data into a table
-- The DAG "sample-dataflow-start-streaming-job" runs for 4 hours when the Terraform is deployed,
-- so the taxi_trips_streaming table will have data.
--
-- If you want more data (to show the incremental features of Dataform), run Dataform, then
-- run the DAG "sample-dataflow-start-streaming-job" (wait a few minutes for it to start) and
-- then re-run below SQL a second time. This will add the new rows streamed into the taxi_trips_streaming
-- to the dataform_demo.taxi_trips_pub_sub table.
-- NOTE: The Dataform job does a Timestamp Subtract of 1 hour for data to load. The Dataflow job must be running for 1+ hours to
-- have data processed by dataform.
/*
{
"ride_id": "1a80b0e2-2adb-431b-8455-aa7ee51839f3",
"point_idx": 69,
"latitude": 40.758160000000004,
"longitude": -73.97748,
"timestamp": "2022-11-28 19:15:31.062430+00",
"meter_reading": 4.233383,
"meter_increment": 0.06135338,
"ride_status": "enroute",
"passenger_count": 1
}
*/
INSERT INTO `${project_id}.dataform_demo.taxi_trips_pub_sub` (_PARTITIONTIME,data)
SELECT TIMESTAMP_TRUNC(timestamp, HOUR),
CONCAT(
"{ ",
'"ride_id"',":",'"',ride_id,'", ',
'"point_idx"',":",point_idx,", ",
'"latitude"',":",latitude,", ",
'"longitude"',":",longitude,", ",
'"timestamp"',":",'"',timestamp,'", ',
'"meter_reading"',":",meter_reading,", ",
'"meter_increment"',":",meter_increment,", ",
'"ride_status"',":",'"',ride_status,'", ',
'"passenger_count"',":",passenger_count,
" }") AS Data
FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` AS parent
WHERE NOT EXISTS (SELECT 1
FROM `${project_id}.dataform_demo.taxi_trips_pub_sub` AS child
WHERE parent.ride_id = JSON_EXTRACT_SCALAR(child.data,'$.ride_id'));