courses/DSL/pub-sub-examples/pub-sub-dataflow-fixed-window-count.ipynb (131 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "fd01a4a3-815d-468c-8bfa-89af1b420d25", "metadata": {}, "source": [ "# Pub/Sub Dataflow Fixed Window Count" ] }, { "cell_type": "code", "execution_count": null, "id": "6b6d5421-9d81-447d-8e9d-103d16a26eae", "metadata": {}, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] --quiet" ] }, { "cell_type": "code", "execution_count": null, "id": "13d10b68-4c77-49e8-ab2f-f3547c4563b9", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Define your Google Cloud project ID and Pub/Sub subscription name\n", "project_id = 'dsl-dar'\n", "subscription_name = 'clicks-dataflow-subscription'" ] }, { "cell_type": "code", "execution_count": null, "id": "5dd98117-7dc2-4029-860a-664782b78fda", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n", "from apache_beam.transforms.trigger import AfterProcessingTime, AccumulationMode, AfterWatermark\n", "import json\n", "\n", "class ParseMessage(beam.DoFn):\n", " def process(self, element):\n", " parsed_element = json.loads(element)\n", " yield parsed_element\n", "\n", "class ExtractRoute(beam.DoFn):\n", " def process(self, element):\n", " route = element.get('route')\n", " yield route\n", "\n", "class FormatOutput(beam.DoFn):\n", " def process(self, element):\n", " route, count = element\n", " yield f'Route: {route}, Clicks: {count}'\n", "\n", "\n", "# Define your pipeline options\n", "options = PipelineOptions()\n", "options.view_as(StandardOptions).streaming = True\n", "options.view_as(StandardOptions).runner = 'DirectRunner' # Use DataflowRunner to run on Google Cloud Dataflow\n", "\n", "# Construct the full subscription path\n", "subscription = f'projects/{project_id}/subscriptions/{subscription_name}'\n", "\n", "# Create the pipeline\n", "p = beam.Pipeline(options=options)\n", "\n", "messages = (\n", " p\n", " | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)\n", " | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))\n", " | 'ParseJSON' >> beam.ParDo(ParseMessage())\n", " | 'ExtractRoute' >> beam.ParDo(ExtractRoute())\n", " | 'WindowIntoFixedWindows' >> beam.WindowInto(\n", " beam.window.FixedWindows(60)\n", " )\n", " | 'CountPerRoute' >> beam.combiners.Count.PerElement()\n", " | 'FormatOutput' >> beam.ParDo(FormatOutput())\n", " | 'PrintMessage' >> beam.Map(print)\n", ")\n", "\n", "# Run the pipeline\n", "result = p.run()\n", "result.wait_until_finish()" ] }, { "cell_type": "code", "execution_count": null, "id": "c33a7457-5f73-4ad1-b968-aff629a576d4", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "apache-beam-2.56.0", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Apache Beam 2.56.0 (Local)", "language": "python", "name": "apache-beam-2.56.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }