courses/DSL/pub-sub-examples/pub-sub-send-and-recieve-sample.ipynb (132 lines of code) (raw):
{
"cells": [
{
"cell_type": "markdown",
"id": "4e4639ba-5aa6-4b21-b30d-a3d176d48ef4",
"metadata": {},
"source": [
"## Send Message Example Code"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "4ba91a0f-da35-43da-9d11-d566dc817630",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from google.cloud import pubsub_v1\n",
"import json\n",
"from datetime import datetime\n",
"\n",
"# Example usage\n",
"project_id = 'dsl-dar'\n",
"topic_id = 'test_topic'\n",
"\n",
"publisher = pubsub_v1.PublisherClient()\n",
"topic_path = publisher.topic_path(project_id, topic_id)\n",
"\n",
"# Prepare the JSON message data\n",
"message_dict = {\n",
" \"username\": \"example_user\",\n",
" \"message\": \"Hello, Pub/Sub!\"\n",
"}\n",
"message_data = json.dumps(message_dict).encode(\"utf-8\")\n",
"\n",
"timestamp = datetime.utcnow().isoformat() + \"Z\" # ISO 8601 format with 'Z' for UTC time\n",
"\n",
"# Publishes a message with the timestamp attribute\n",
"future = publisher.publish(topic_path, message_data, timestamp=timestamp)\n",
"print(f\"Published message ID: {future.result()}\")"
]
},
{
"cell_type": "markdown",
"id": "62b4b084-39e1-4f53-8707-4708164f671b",
"metadata": {},
"source": [
"# Pull the Message and Display the Contents"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f67de116-f248-467e-96a2-fceb56e5d599",
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from google.cloud import pubsub_v1\n",
"from concurrent.futures import TimeoutError\n",
"import json\n",
"\n",
"subscription_id = \"test_topic-sub\"\n",
"timeout = 30.0 # Timeout in seconds\n",
"\n",
"subscriber = pubsub_v1.SubscriberClient()\n",
"subscription_path = subscriber.subscription_path(project_id, subscription_id)\n",
"\n",
"def callback(message):\n",
" print(message)\n",
" \n",
" print(f\"Received message: {message.data.decode('utf-8')}\")\n",
" if message.attributes:\n",
" print(\"Attributes:\")\n",
" for key, value in message.attributes.items():\n",
" print(f\"{key}: {value}\")\n",
" message.ack()\n",
"\n",
"# Subscribe to the subscription\n",
"streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)\n",
"print(f\"Listening for messages on {subscription_path}...\\n\")\n",
"\n",
"# Wrap subscriber in a 'with' block to automatically call close() when done\n",
"with subscriber:\n",
" try:\n",
" # Going indefinitely, the streaming_pull_future will never return if there are no messages\n",
" streaming_pull_future.result(timeout=timeout)\n",
" except TimeoutError:\n",
" streaming_pull_future.cancel() # Trigger the shutdown\n",
" streaming_pull_future.result() # Block until the shutdown is complete\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cf39e295-d670-478a-abe7-3048b93958ca",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"environment": {
"kernel": "apache-beam-2.56.0",
"name": ".m116",
"type": "gcloud",
"uri": "gcr.io/deeplearning-platform-release/:m116"
},
"kernelspec": {
"display_name": "Apache Beam 2.56.0 (Local)",
"language": "python",
"name": "apache-beam-2.56.0"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.14"
}
},
"nbformat": 4,
"nbformat_minor": 5
}