courses/DSL/pub-sub-examples/pub-sub-pull-subscriber.ipynb (122 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "912c1c01-3123-43ad-964a-7de577f82f8a",
"metadata": {
"tags": []
},
"source": [
"# Pub/Sub Pull Subscriber"
]
},
{
"cell_type": "markdown",
"id": "bc4b1631-db13-4056-93cd-1427c861eeff",
"metadata": {},
"source": [
"## Create a Subscription and subscribe to the clicks topic\n",
"\n",
"The following example uses a streaming pull to process messages as they arrive. \n",
"The callback function processes the messages and acknowledges reciept. "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e56fc4aa-c517-4ca4-8f4c-ac882870f9fc",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"import os\n",
"import json\n",
"from google.cloud import pubsub_v1\n",
"from google.api_core.exceptions import AlreadyExists\n",
"\n",
"\n",
"# Project, topic, and subscription variables\n",
"project_id = 'dsl-dar'\n",
"topic_id = 'clicks'\n",
"subscription_id = 'my-pull-subscription'\n",
"\n",
"# Initialize the Publisher and Subscriber clients\n",
"publisher = pubsub_v1.PublisherClient()\n",
"subscriber = pubsub_v1.SubscriberClient()\n",
"topic_path = publisher.topic_path(project_id, topic_id)\n",
"subscription_path = subscriber.subscription_path(project_id, subscription_id)\n",
"\n",
"\n",
"# Create the subscription if it doesn't exist\n",
"try:\n",
" subscriber.create_subscription(name=subscription_path, topic=topic_path)\n",
" print(f'Subscription {subscription_path} created.')\n",
"except AlreadyExists:\n",
" print(f'Subscription {subscription_path} already exists.')\n",
"\n",
"# Callback function to process received messages\n",
"def callback(message):\n",
" print(f'Received message: {message.data.decode(\"utf-8\")}')\n",
" message.ack()\n",
"\n",
"# Subscribe to the Pub/Sub topic and pull messages\n",
"streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)\n",
"print(f'Listening for messages on {subscription_path}...')\n",
"\n",
"# Wrap subscriber in a 'with' block to automatically call close() when done\n",
"with subscriber:\n",
" try:\n",
" # Streaming pull future will block indefinitely\n",
" streaming_pull_future.result()\n",
" except KeyboardInterrupt:\n",
" streaming_pull_future.cancel()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "95babfc5-d8dd-46eb-b8e0-6e8126ff7b05",
"metadata": {},
"outputs": [],
"source": [
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4a954851-ad65-4e68-abeb-a999d3daa27e",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}