projects/streamlit-pubsub/streamlit_pubsub_demo.py (69 lines of code) (raw):
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Demonstration of Streamlit integration with Pub/Sub.
This demonstration shows Streamlit working with Pub/Sub, integrating with
asynchronous IO.
"""
import argparse
import asyncio
import datetime
import json
import streamlit as st
from streamlit_pubsub import get_publisher
from streamlit_pubsub import get_subscriber
def get_args():
"""Parse command line arguments for Streamlit Pub/Sub demo."""
parser = argparse.ArgumentParser(
prog="Streamlit PubSub Demo",
description="Demonstration PubSub messages in Streamlit",
)
parser.add_argument(
"project_id", type=str, help="Project ID for subscription"
)
parser.add_argument("topic_id", type=str, help="Publish Topic ID")
return parser.parse_args()
#
# Streamlit App
#
BUFFER_SIZE = 5
args = get_args()
if "messages" not in st.session_state:
st.session_state["messages"] = []
# Cached, so fetch as often as you need
publisher = get_publisher(args.project_id, args.topic_id)
st.title("Pub/Sub Subscription Sample")
# Create a container with a placeholder for the current time
with st.container():
st.header("Current Time")
st.toggle("On/Off", key="time_active", value=False)
time_placeholder = st.empty()
with time_placeholder.container():
st.write(str(datetime.datetime.now()))
# Create a container with text input for publishing
with st.container():
st.header("Publish Message")
st.text_input(
"Enter some message to publish",
key="pub_msg",
on_change=lambda: publisher(
bytes(json.dumps({"message": st.session_state["pub_msg"]}), "utf-8")
),
)
# Create a container with a placeholder for the incoming messages
with st.container():
st.header("Subscribe Messages")
st.toggle("On/Off", key="sub_active", value=False)
messages_placeholder = st.empty()
# Create general tool for rendering messages
def render_messages():
with messages_placeholder.container():
for e in st.session_state.messages:
st.write(e)
# Render any messages now
render_messages()
# Show_timme() displays a ticking clock. This is a demonstration of how
# asyncio can be used in streamlit as part of an updating UI.
async def show_time():
"""If active, asynchronously update the timestamp forever.
This function is a demonstration of using asynchronous calls to poll or
operate in the background in a running Streamlit dashboard.
"""
if not st.session_state.time_active:
return
while True:
# Render in streamlit time container
with time_placeholder.container():
st.write(str(datetime.datetime.now()))
# Sleep for a second
await asyncio.sleep(1.0)
# Read_continuously() keeps pulling out, with async, the latest data
# with a greater sequence than last time.
async def read_continuously():
"""If active, asynchronously update Pub/Sub subscription messages.
This function is a demonstration of using asynchronous calls to fetch
Pub/Sub messages.
"""
if not st.session_state.sub_active:
return
sub = get_subscriber(
args.project_id, args.topic_id, max_messages=BUFFER_SIZE
)
while True:
data = await sub.aget_latest_st_data(seq_id_key="seq_id")
# Add new data messages (assuming to be UTF-8 JSON strings)
for msg in data:
st.session_state.messages.append(json.loads(str(msg, "utf8")))
# Purge to BUFFER_SIZE
st.session_state.messages = st.session_state.messages[-BUFFER_SIZE:]
# Render the messages
render_messages()
# Dispatch tasks with asyncio
async def run_tasks():
"""Gather all of the asynchronous work -- it ends when all is done."""
await asyncio.gather(show_time(), read_continuously())
# This creates a new event loop. We want that.
asyncio.run(run_tasks())