courses/DSL/pub-sub-examples/pub-sub-dataflow-sliding-window-count-bigquery.ipynb (153 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "fd01a4a3-815d-468c-8bfa-89af1b420d25", "metadata": {}, "source": [ "# Pub/Sub Dataflow Sliding Window Count to BigQuery" ] }, { "cell_type": "code", "execution_count": null, "id": "6b6d5421-9d81-447d-8e9d-103d16a26eae", "metadata": {}, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] --quiet" ] }, { "cell_type": "code", "execution_count": 1, "id": "13d10b68-4c77-49e8-ab2f-f3547c4563b9", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Define your Google Cloud project ID and Pub/Sub subscription name\n", "project_id = 'dsl-dar'\n", "subscription_name = 'clicks-dataflow-subscription'\n", "dataset_id = 'pubsub_dataset'\n", "table_id = 'clicks-per-minute'" ] }, { "cell_type": "code", "execution_count": null, "id": "5dd98117-7dc2-4029-860a-664782b78fda", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n", "from apache_beam.io.gcp.bigquery import WriteToBigQuery\n", "from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode, AfterWatermark\n", "import json\n", "from datetime import datetime\n", "\n", "class ParseMessage(beam.DoFn):\n", " def process(self, element):\n", " parsed_element = json.loads(element)\n", " yield parsed_element\n", "\n", "class ExtractRoute(beam.DoFn):\n", " def process(self, element):\n", " route = element.get('route')\n", " yield route\n", "\n", "class FormatOutput(beam.DoFn):\n", " def process(self, element, window=beam.DoFn.WindowParam):\n", " route, count = element\n", " window_end = window.end.to_utc_datetime()\n", " yield {\n", " 'Route': route,\n", " 'Count': count,\n", " 'Timestamp': window_end.strftime('%Y-%m-%d %H:%M:%S')\n", " }\n", "\n", "# Define your pipeline options\n", "options = PipelineOptions()\n", "options.view_as(StandardOptions).streaming = True\n", "options.view_as(StandardOptions).runner = 'DirectRunner' # Use DataflowRunner to run on Google Cloud Dataflow\n", "\n", "# Construct the full subscription path\n", "subscription = f'projects/{project_id}/subscriptions/{subscription_name}'\n", "\n", "# Define the BigQuery table schema\n", "table_schema = {\n", " 'fields': [\n", " {'name': 'Route', 'type': 'STRING', 'mode': 'REQUIRED'},\n", " {'name': 'Count', 'type': 'INTEGER', 'mode': 'REQUIRED'},\n", " {'name': 'Timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},\n", " ]\n", "}\n", "\n", "# Create the pipeline\n", "p = beam.Pipeline(options=options)\n", "\n", "messages = (\n", " p\n", " | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)\n", " | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))\n", " | 'ParseJSON' >> beam.ParDo(ParseMessage())\n", " | 'ExtractRoute' >> beam.ParDo(ExtractRoute())\n", " | 'WindowIntoSlidingWindows' >> beam.WindowInto(\n", " beam.window.SlidingWindows(size=60, period=15)\n", " )\n", " | 'CountPerRoute' >> beam.combiners.Count.PerElement()\n", " | 'FormatOutput' >> beam.ParDo(FormatOutput())\n", " | 'WriteToBigQuery' >> WriteToBigQuery(\n", " table=f'{project_id}:{dataset_id}.{table_id}',\n", " schema=table_schema,\n", " write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,\n", " create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED\n", " )\n", ")\n", "\n", "# Run the pipeline\n", "result = p.run()\n", "result.wait_until_finish()\n" ] }, { "cell_type": "code", "execution_count": null, "id": "c33a7457-5f73-4ad1-b968-aff629a576d4", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "apache-beam-2.56.0", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Apache Beam 2.56.0 (Local)", "language": "python", "name": "apache-beam-2.56.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }