courses/DSL/pub-sub-examples/pub-sub-pull-subscriber.ipynb (122 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "912c1c01-3123-43ad-964a-7de577f82f8a", "metadata": { "tags": [] }, "source": [ "# Pub/Sub Pull Subscriber" ] }, { "cell_type": "markdown", "id": "bc4b1631-db13-4056-93cd-1427c861eeff", "metadata": {}, "source": [ "## Create a Subscription and subscribe to the clicks topic\n", "\n", "The following example uses a streaming pull to process messages as they arrive. \n", "The callback function processes the messages and acknowledges reciept. " ] }, { "cell_type": "code", "execution_count": null, "id": "e56fc4aa-c517-4ca4-8f4c-ac882870f9fc", "metadata": { "tags": [] }, "outputs": [], "source": [ "import os\n", "import json\n", "from google.cloud import pubsub_v1\n", "from google.api_core.exceptions import AlreadyExists\n", "\n", "\n", "# Project, topic, and subscription variables\n", "project_id = 'dsl-dar'\n", "topic_id = 'clicks'\n", "subscription_id = 'my-pull-subscription'\n", "\n", "# Initialize the Publisher and Subscriber clients\n", "publisher = pubsub_v1.PublisherClient()\n", "subscriber = pubsub_v1.SubscriberClient()\n", "topic_path = publisher.topic_path(project_id, topic_id)\n", "subscription_path = subscriber.subscription_path(project_id, subscription_id)\n", "\n", "\n", "# Create the subscription if it doesn't exist\n", "try:\n", " subscriber.create_subscription(name=subscription_path, topic=topic_path)\n", " print(f'Subscription {subscription_path} created.')\n", "except AlreadyExists:\n", " print(f'Subscription {subscription_path} already exists.')\n", "\n", "# Callback function to process received messages\n", "def callback(message):\n", " print(f'Received message: {message.data.decode(\"utf-8\")}')\n", " message.ack()\n", "\n", "# Subscribe to the Pub/Sub topic and pull messages\n", "streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)\n", "print(f'Listening for messages on {subscription_path}...')\n", "\n", "# Wrap subscriber in a 'with' block to automatically call close() when done\n", "with subscriber:\n", " try:\n", " # Streaming pull future will block indefinitely\n", " streaming_pull_future.result()\n", " except KeyboardInterrupt:\n", " streaming_pull_future.cancel()\n" ] }, { "cell_type": "code", "execution_count": null, "id": "95babfc5-d8dd-46eb-b8e0-6e8126ff7b05", "metadata": {}, "outputs": [], "source": [ "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "4a954851-ad65-4e68-abeb-a999d3daa27e", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "apache-beam-2.56.0", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Apache Beam 2.56.0 (Local)", "language": "python", "name": "apache-beam-2.56.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }