## Send Message Example Code

In [None]:
from google.cloud import pubsub_v1
import json
from datetime import datetime

# Example usage
project_id = 'dsl-dar'
topic_id = 'test_topic'

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

# Prepare the JSON message data
message_dict = {
    "username": "example_user",
    "message": "Hello, Pub/Sub!"
}
message_data = json.dumps(message_dict).encode("utf-8")

timestamp = datetime.utcnow().isoformat() + "Z"  # ISO 8601 format with 'Z' for UTC time

# Publishes a message with the timestamp attribute
future = publisher.publish(topic_path, message_data, timestamp=timestamp)
print(f"Published message ID: {future.result()}")

# Pull the Message and Display the Contents

In [None]:
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
import json

subscription_id = "test_topic-sub"
timeout = 30.0  # Timeout in seconds

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message):
    print(message)
    
    print(f"Received message: {message.data.decode('utf-8')}")
    if message.attributes:
        print("Attributes:")
        for key, value in message.attributes.items():
            print(f"{key}: {value}")
    message.ack()

# Subscribe to the subscription
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}...\n")

# Wrap subscriber in a 'with' block to automatically call close() when done
with subscriber:
    try:
        # Going indefinitely, the streaming_pull_future will never return if there are no messages
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown
        streaming_pull_future.result()  # Block until the shutdown is complete
