courses/DSL/pub-sub-examples/pub-sub-dataflow-sliding-window-count-bigquery.ipynb (153 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "fd01a4a3-815d-468c-8bfa-89af1b420d25",
"metadata": {},
"source": [
"# Pub/Sub Dataflow Sliding Window Count to BigQuery"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b6d5421-9d81-447d-8e9d-103d16a26eae",
"metadata": {},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] --quiet"
]
},
{
"cell_type": "code",
"execution_count": 1,
"id": "13d10b68-4c77-49e8-ab2f-f3547c4563b9",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Define your Google Cloud project ID and Pub/Sub subscription name\n",
"project_id = 'dsl-dar'\n",
"subscription_name = 'clicks-dataflow-subscription'\n",
"dataset_id = 'pubsub_dataset'\n",
"table_id = 'clicks-per-minute'"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5dd98117-7dc2-4029-860a-664782b78fda",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n",
"from apache_beam.io.gcp.bigquery import WriteToBigQuery\n",
"from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode, AfterWatermark\n",
"import json\n",
"from datetime import datetime\n",
"\n",
"class ParseMessage(beam.DoFn):\n",
" def process(self, element):\n",
" parsed_element = json.loads(element)\n",
" yield parsed_element\n",
"\n",
"class ExtractRoute(beam.DoFn):\n",
" def process(self, element):\n",
" route = element.get('route')\n",
" yield route\n",
"\n",
"class FormatOutput(beam.DoFn):\n",
" def process(self, element, window=beam.DoFn.WindowParam):\n",
" route, count = element\n",
" window_end = window.end.to_utc_datetime()\n",
" yield {\n",
" 'Route': route,\n",
" 'Count': count,\n",
" 'Timestamp': window_end.strftime('%Y-%m-%d %H:%M:%S')\n",
" }\n",
"\n",
"# Define your pipeline options\n",
"options = PipelineOptions()\n",
"options.view_as(StandardOptions).streaming = True\n",
"options.view_as(StandardOptions).runner = 'DirectRunner' # Use DataflowRunner to run on Google Cloud Dataflow\n",
"\n",
"# Construct the full subscription path\n",
"subscription = f'projects/{project_id}/subscriptions/{subscription_name}'\n",
"\n",
"# Define the BigQuery table schema\n",
"table_schema = {\n",
" 'fields': [\n",
" {'name': 'Route', 'type': 'STRING', 'mode': 'REQUIRED'},\n",
" {'name': 'Count', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n",
" {'name': 'Timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},\n",
" ]\n",
"}\n",
"\n",
"# Create the pipeline\n",
"p = beam.Pipeline(options=options)\n",
"\n",
"messages = (\n",
" p\n",
" | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)\n",
" | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))\n",
" | 'ParseJSON' >> beam.ParDo(ParseMessage())\n",
" | 'ExtractRoute' >> beam.ParDo(ExtractRoute())\n",
" | 'WindowIntoSlidingWindows' >> beam.WindowInto(\n",
" beam.window.SlidingWindows(size=60, period=15)\n",
" )\n",
" | 'CountPerRoute' >> beam.combiners.Count.PerElement()\n",
" | 'FormatOutput' >> beam.ParDo(FormatOutput())\n",
" | 'WriteToBigQuery' >> WriteToBigQuery(\n",
" table=f'{project_id}:{dataset_id}.{table_id}',\n",
" schema=table_schema,\n",
" write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n",
" create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n",
" )\n",
")\n",
"\n",
"# Run the pipeline\n",
"result = p.run()\n",
"result.wait_until_finish()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c33a7457-5f73-4ad1-b968-aff629a576d4",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}