courses/DSL/pub-sub-examples/pub-sub-send-and-recieve-sample.ipynb (132 lines of code) (raw):

{ "cells": [ { "cell_type": "markdown", "id": "4e4639ba-5aa6-4b21-b30d-a3d176d48ef4", "metadata": {}, "source": [ "## Send Message Example Code" ] }, { "cell_type": "code", "execution_count": null, "id": "4ba91a0f-da35-43da-9d11-d566dc817630", "metadata": { "tags": [] }, "outputs": [], "source": [ "from google.cloud import pubsub_v1\n", "import json\n", "from datetime import datetime\n", "\n", "# Example usage\n", "project_id = 'dsl-dar'\n", "topic_id = 'test_topic'\n", "\n", "publisher = pubsub_v1.PublisherClient()\n", "topic_path = publisher.topic_path(project_id, topic_id)\n", "\n", "# Prepare the JSON message data\n", "message_dict = {\n", " \"username\": \"example_user\",\n", " \"message\": \"Hello, Pub/Sub!\"\n", "}\n", "message_data = json.dumps(message_dict).encode(\"utf-8\")\n", "\n", "timestamp = datetime.utcnow().isoformat() + \"Z\" # ISO 8601 format with 'Z' for UTC time\n", "\n", "# Publishes a message with the timestamp attribute\n", "future = publisher.publish(topic_path, message_data, timestamp=timestamp)\n", "print(f\"Published message ID: {future.result()}\")" ] }, { "cell_type": "markdown", "id": "62b4b084-39e1-4f53-8707-4708164f671b", "metadata": {}, "source": [ "# Pull the Message and Display the Contents" ] }, { "cell_type": "code", "execution_count": null, "id": "f67de116-f248-467e-96a2-fceb56e5d599", "metadata": { "tags": [] }, "outputs": [], "source": [ "from google.cloud import pubsub_v1\n", "from concurrent.futures import TimeoutError\n", "import json\n", "\n", "subscription_id = \"test_topic-sub\"\n", "timeout = 30.0 # Timeout in seconds\n", "\n", "subscriber = pubsub_v1.SubscriberClient()\n", "subscription_path = subscriber.subscription_path(project_id, subscription_id)\n", "\n", "def callback(message):\n", " print(message)\n", " \n", " print(f\"Received message: {message.data.decode('utf-8')}\")\n", " if message.attributes:\n", " print(\"Attributes:\")\n", " for key, value in message.attributes.items():\n", " print(f\"{key}: {value}\")\n", " message.ack()\n", "\n", "# Subscribe to the subscription\n", "streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)\n", "print(f\"Listening for messages on {subscription_path}...\\n\")\n", "\n", "# Wrap subscriber in a 'with' block to automatically call close() when done\n", "with subscriber:\n", " try:\n", " # Going indefinitely, the streaming_pull_future will never return if there are no messages\n", " streaming_pull_future.result(timeout=timeout)\n", " except TimeoutError:\n", " streaming_pull_future.cancel() # Trigger the shutdown\n", " streaming_pull_future.result() # Block until the shutdown is complete\n" ] }, { "cell_type": "code", "execution_count": null, "id": "cf39e295-d670-478a-abe7-3048b93958ca", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "environment": { "kernel": "apache-beam-2.56.0", "name": ".m116", "type": "gcloud", "uri": "gcr.io/deeplearning-platform-release/:m116" }, "kernelspec": { "display_name": "Apache Beam 2.56.0 (Local)", "language": "python", "name": "apache-beam-2.56.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.14" } }, "nbformat": 4, "nbformat_minor": 5 }