courses/DSL/pub-sub-examples/pub-sub-dataflow-read-parse-print.ipynb (112 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "4e185dcf-80d7-4b0b-8aa7-1cefe641837e",
"metadata": {},
"source": [
"# Dataflow Read Parse Print"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6b6d5421-9d81-447d-8e9d-103d16a26eae",
"metadata": {},
"outputs": [],
"source": [
"! pip install apache_beam apache-beam[gcp] --quiet"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "13d10b68-4c77-49e8-ab2f-f3547c4563b9",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"# Define your Google Cloud project ID and Pub/Sub subscription name\n",
"project_id = 'dsl-dar'\n",
"subscription_name = 'clicks-dataflow-subscription'"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "40a342fc-70bd-4f08-8c8e-b49281d87f5c",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import apache_beam as beam\n",
"from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions\n",
"import json\n",
"\n",
"class ParseMessage(beam.DoFn):\n",
" def process(self, element):\n",
" parsed_element = json.loads(element)\n",
" yield parsed_element\n",
"\n",
"# Define your pipeline options\n",
"options = PipelineOptions()\n",
"options.view_as(StandardOptions).streaming = True\n",
"options.view_as(StandardOptions).runner = 'DirectRunner' # Use DataflowRunner to run on Google Cloud Dataflow\n",
"\n",
"subscription = f'projects/{project_id}/subscriptions/{subscription_name}'\n",
"\n",
"# Create the pipeline\n",
"p = beam.Pipeline(options=options)\n",
"\n",
"messages = (\n",
" p\n",
" | 'ReadFromPubSub' >> beam.io.ReadFromPubSub(subscription=subscription)\n",
" | 'DecodeMessage' >> beam.Map(lambda x: x.decode('utf-8'))\n",
" | 'ParseJSON' >> beam.ParDo(ParseMessage())\n",
" | 'PrintMessage' >> beam.Map(print)\n",
")\n",
"\n",
"# Run the pipeline\n",
"result = p.run()\n",
"result.wait_until_finish()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5dd98117-7dc2-4029-860a-664782b78fda",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}