courses/DSL/pub-sub-examples/pub-sub-dataflow-sliding-window-count.ipynb (149 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "fd01a4a3-815d-468c-8bfa-89af1b420d25",
"metadata": {},
"source": [
"# Pub/Sub Dataflow Sliding Window Count"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b6d5421-9d81-447d-8e9d-103d16a26eae",
"metadata": {},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "13d10b68-4c77-49e8-ab2f-f3547c4563b9",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Define your Google Cloud project ID and Pub/Sub subscription name\n",
"project_id = 'dsl-dar'\n",
"subscription_name = 'clicks-dataflow-subscription'"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e4428fb7-47e1-4191-bc73-4bad408fb0cd",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n",
"from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode, AfterWatermark\n",
"import json\n",
"\n",
"class ParseMessage(beam.DoFn):\n",
" def process(self, element):\n",
" parsed_element = json.loads(element)\n",
" yield parsed_element\n",
"\n",
"class ExtractRoute(beam.DoFn):\n",
" def process(self, element):\n",
" route = element.get('route')\n",
" yield route\n",
"\n",
"class FormatOutput(beam.DoFn):\n",
" def process(self, element):\n",
" route, count = element\n",
" yield f'Route: {route}, Clicks: {count}'\n",
"\n",
"\n",
"# Define your pipeline options\n",
"options = PipelineOptions()\n",
"options.view_as(StandardOptions).streaming = True\n",
"options.view_as(StandardOptions).runner = 'DirectRunner' # Use DataflowRunner to run on Google Cloud Dataflow\n",
"\n",
"# Construct the full subscription path\n",
"subscription = f'projects/{project_id}/subscriptions/{subscription_name}'\n",
"\n",
"# Create the pipeline\n",
"p = beam.Pipeline(options=options)\n",
"\n",
"messages = (\n",
" p\n",
" | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)\n",
" | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))\n",
" | 'ParseJSON' >> beam.ParDo(ParseMessage())\n",
" | 'ExtractRoute' >> beam.ParDo(ExtractRoute())\n",
" | 'WindowIntoSlidingWindows' >> beam.WindowInto(\n",
" beam.window.SlidingWindows(size=60, period=15),\n",
" trigger=AfterWatermark(),\n",
" accumulation_mode=AccumulationMode.ACCUMULATING\n",
" )\n",
" | 'CountPerRoute' >> beam.combiners.Count.PerElement()\n",
" | 'FormatOutput' >> beam.ParDo(FormatOutput())\n",
" | 'PrintMessage' >> beam.Map(print)\n",
")\n",
"\n",
"# Run the pipeline\n",
"result = p.run()\n",
"result.wait_until_finish()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c33a7457-5f73-4ad1-b968-aff629a576d4",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "8b1dd6ca-f18e-43f5-9949-1fb79b08933b",
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"id": "5e306de7-998a-495e-9e57-ec51640e449b",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}