sql-scripts/taxi_dataset/sp_demo_taxi_streaming_data.sql (123 lines of code) (raw):

/*################################################################################## # Copyright 2022 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ###################################################################################*/ /* Prerequisites: - In Composer / Airflow start the DAG: sample-dataflow-streaming-bigquery - It will take several minutes for the DAG to start - To show machine learning on the stream run the following stored procedure at least 30 minutes before demoing - CALL `${project_id}.${bigquery_taxi_dataset}.sp_demo_machine_learning_anomaly_fee_amount`(); Use Cases: - Receive realtime data from streaming sources directly into BigQuery Description: - Shows streaming data from Pub/Sub -> Dataflow -> BigQuery - BigQuery has streaming ingestion where data is available as soon as it is ingested - Micro-batching is not used, the data is immediate Reference: - https://cloud.google.com/bigquery/docs/write-api Clean up / Reset script: n/a */ -- Open the table taxi_trips_streaming -- Click on details to see the streaming buffer stats -- Some of the data is in columnar format and some is in row format, but when we query -- we do not need to worry about where the data is, BigQuery just queries the data. -- Current data within past hour (run over and over again to show data streaming) SELECT COUNT(*) AS RecordCount FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 HOUR); -- Same SQL as above, just orders the data -- Show current data in past hour (slower query due to ORDER BY) SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 HOUR) ORDER BY timestamp DESC; -- Data older than last hour SELECT COUNT(*) AS RecordCount FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp < TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 HOUR); -- Show data from 1 hour ago to 2 hours ago (provided the streaming job has been running for the past 3 hours) SELECT * FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 2 HOUR) AND TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 HOUR) LIMIT 100; -- Same SQL as above, but does a Count -- Show data from 1 hour ago to 2 hours ago (provided the streaming job has been running for the past 3 hours) SELECT COUNT(*) AS RecordCount FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 2 HOUR) AND TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 1 HOUR); ------------------------------------------------------------------------------------ -- Self join the streaming data -- The streaming data has a "pickup" record (message) and then a "dropoff" message -- We need to match the pickup and dropoff seperate records (by ride_id) so we can compute the time and distance between pickup and dropoff -- The distance is computed using Geospacial functions in BigQuery ------------------------------------------------------------------------------------ WITH LatestData AS ( -- Get last 15 minutes of data SELECT ride_id, timestamp, longitude, latitude, meter_reading, ride_status FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 15 MINUTE) ) , results AS ( -- for the last 15 minutes match the pickup with dropoff -- compute miles since ST_DISTANCE is in meters (divide by 1609.34) SELECT enroute.ride_id, enroute.timestamp AS PickupTime, dropoff.timestamp AS DropoffTime, ST_DISTANCE(ST_GEOGPOINT(enroute.longitude, enroute.latitude), ST_GEOGPOINT(dropoff.longitude, dropoff.latitude)) / 1609.34 AS Trip_Distance, TIMESTAMP_DIFF(dropoff.timestamp, enroute.timestamp, MINUTE) AS DurationMinutes, dropoff.meter_reading AS Fare_Amount FROM LatestData AS enroute INNER JOIN LatestData AS dropoff ON enroute.ride_id = dropoff.ride_id AND enroute.ride_status = 'enroute' AND dropoff.ride_status = 'dropoff' ) SELECT * FROM results; ------------------------------------------------------------------------------------ -- Run machine learning on the streaming data -- NOTE You must run the sp_demo_machine_learning_anomaly_fee_amount procedure First in order to train the model! ------------------------------------------------------------------------------------ EXECUTE IMMEDIATE """ WITH LatestData AS ( SELECT ride_id, timestamp, longitude, latitude, meter_reading, ride_status FROM `${project_id}.${bigquery_taxi_dataset}.taxi_trips_streaming` WHERE timestamp > TIMESTAMP_SUB(CURRENT_TIMESTAMP(),INTERVAL 15 MINUTE) ) , results AS ( SELECT enroute.ride_id, enroute.timestamp AS PickupTime, dropoff.timestamp AS DropoffTime, ST_DISTANCE(ST_GEOGPOINT(enroute.longitude, enroute.latitude), ST_GEOGPOINT(dropoff.longitude, dropoff.latitude)) / 1609.34 AS Trip_Distance, TIMESTAMP_DIFF(dropoff.timestamp, enroute.timestamp, MINUTE) AS DurationMinutes, dropoff.meter_reading AS Fare_Amount FROM LatestData AS enroute INNER JOIN LatestData AS dropoff ON enroute.ride_id = dropoff.ride_id AND enroute.ride_status = 'enroute' AND dropoff.ride_status = 'dropoff' ) SELECT * FROM ML.DETECT_ANOMALIES (MODEL `${project_id}.${bigquery_taxi_dataset}.model_predict_anomoly_v1`, STRUCT(.2 AS contamination), (SELECT '161-237' AS Trip, DurationMinutes, Trip_Distance, Fare_Amount FROM results LIMIT 100 )) WHERE is_anomaly = TRUE; """; ------------------------------------------------------------------------------------ -- Streaming delivery data -- This is from the thelook_ecommerce dataset procedure "demo_queries" -- It shows how we can using streaming data to see how our delivery times are increasing or remaining steady ------------------------------------------------------------------------------------ WITH MaxStreamingDate AS ( -- Use the max date (versus current time in case the streaming job is not started) SELECT MAX(delivery_time) AS max_delivery_time FROM `${project_id}.${bigquery_thelook_ecommerce_dataset}.product_deliveries_streaming` ) -- SELECT max_delivery_time FROM MaxStreamingDate; , AverageDeliveryTime AS ( SELECT CASE WHEN delivery_time > TIMESTAMP_SUB(max_delivery_time, INTERVAL 60 MINUTE) THEN 'CurrentWindow' ELSE 'PriorWindow' END AS TimeWindow, distribution_center_id, delivery_minutes, distance FROM `${project_id}.${bigquery_thelook_ecommerce_dataset}.product_deliveries_streaming` CROSS JOIN MaxStreamingDate WHERE delivery_time > TIMESTAMP_SUB(max_delivery_time, INTERVAL 5 DAY) AND delivery_minutes > 0 AND distance > 0 ) --SELECT * FROM AverageDeliveryTime ORDER BY distribution_center_id, TimeWindow; , PivotData AS ( SELECT * FROM AverageDeliveryTime PIVOT (AVG(delivery_minutes) avg_delivery_minutes, AVG(distance) avg_distance, COUNT(*) nbr_of_deliveries FOR TimeWindow IN ('CurrentWindow', 'PriorWindow')) ) -- SELECT * FROM PivotData; SELECT distribution_centers.name AS distribution_center, nbr_of_deliveries_CurrentWindow AS deliveries_current, nbr_of_deliveries_PriorWindow AS deliveries_prior, ROUND(avg_distance_CurrentWindow,1) AS distance_current, ROUND(avg_distance_PriorWindow,1) AS distance_prior, CASE WHEN avg_distance_CurrentWindow > avg_distance_PriorWindow + (avg_distance_PriorWindow * .15) THEN 'High' WHEN avg_distance_CurrentWindow > avg_distance_PriorWindow + (avg_distance_PriorWindow * .10) THEN 'Med' WHEN avg_distance_CurrentWindow > avg_distance_PriorWindow + (avg_distance_PriorWindow * .05) THEN 'Low' ELSE 'Normal' END AS distance_trend, ROUND(avg_delivery_minutes_CurrentWindow,1) AS minutes_current, ROUND(avg_delivery_minutes_PriorWindow,1) AS minutes_prior, CASE WHEN avg_delivery_minutes_CurrentWindow > avg_delivery_minutes_PriorWindow + (avg_delivery_minutes_PriorWindow * .15) THEN 'High' WHEN avg_delivery_minutes_CurrentWindow > avg_delivery_minutes_PriorWindow + (avg_delivery_minutes_PriorWindow * .10) THEN 'Med' WHEN avg_delivery_minutes_CurrentWindow > avg_delivery_minutes_PriorWindow + (avg_delivery_minutes_PriorWindow * .05) THEN 'Low' ELSE 'Normal' END AS minutes_trend FROM PivotData INNER JOIN `${project_id}.${bigquery_thelook_ecommerce_dataset}.distribution_centers` AS distribution_centers ON PivotData.distribution_center_id = distribution_centers.id ORDER BY distribution_center_id;