courses/DSL/pub-sub-examples/pub-sub-dataflow-read-parse-print.ipynb (112 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "4e185dcf-80d7-4b0b-8aa7-1cefe641837e", "metadata": {}, "source": [ "# Dataflow Read Parse Print" ] }, { "cell_type": "code", "execution_count": null, "id": "6b6d5421-9d81-447d-8e9d-103d16a26eae", "metadata": {}, "outputs": [], "source": [ "! pip install apache_beam apache-beam[gcp] --quiet" ] }, { "cell_type": "code", "execution_count": null, "id": "13d10b68-4c77-49e8-ab2f-f3547c4563b9", "metadata": { "tags": [] }, "outputs": [], "source": [ "# Define your Google Cloud project ID and Pub/Sub subscription name\n", "project_id = 'dsl-dar'\n", "subscription_name = 'clicks-dataflow-subscription'" ] }, { "cell_type": "code", "execution_count": null, "id": "40a342fc-70bd-4f08-8c8e-b49281d87f5c", "metadata": { "tags": [] }, "outputs": [], "source": [ "import apache_beam as beam\n", "from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n", "import json\n", "\n", "class ParseMessage(beam.DoFn):\n", " def process(self, element):\n", " parsed_element = json.loads(element)\n", " yield parsed_element\n", "\n", "# Define your pipeline options\n", "options = PipelineOptions()\n", "options.view_as(StandardOptions).streaming = True\n", "options.view_as(StandardOptions).runner = 'DirectRunner' # Use DataflowRunner to run on Google Cloud Dataflow\n", "\n", "subscription = f'projects/{project_id}/subscriptions/{subscription_name}'\n", "\n", "# Create the pipeline\n", "p = beam.Pipeline(options=options)\n", "\n", "messages = (\n", " p\n", " | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)\n", " | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))\n", " | 'ParseJSON' >> beam.ParDo(ParseMessage())\n", " | 'PrintMessage' >> beam.Map(print)\n", ")\n", "\n", "# Run the pipeline\n", "result = p.run()\n", "result.wait_until_finish()\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5dd98117-7dc2-4029-860a-664782b78fda", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "apache-beam-2.56.0", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Apache Beam 2.56.0 (Local)", "language": "python", "name": "apache-beam-2.56.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }