in runinferenceutil/load_data_frombq_pubsub.py [0:0]
def run():
parser = ArgumentParser()
parser.add_argument("--project_id", dest="project_id",
help="projectId where bigquery dataset/table resides")
parser.add_argument("--topic_id", dest="topic_id",
help="topicId of PubSub where to write the messages to")
try:
args = parser.parse_args()
except:
parser.print_help()
sys.exit(0)
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(args.project_id, args.topic_id)
publish_futures = []
client = bigquery.Client()
#Perform a query for a system lag example
QUERY = (
'SELECT text from summit2023.GOSU_AI_Dota2_game_chats '
'WHERE match = 507332 ')
#Perform a query for a positive sentiment game (LOL, gg) etc
# QUERY = (
# 'SELECT text from summit2023.GOSU_AI_Dota2_game_chats '
# 'WHERE match = 617328 '
# 'LIMIT 1000')
query_job = client.query(QUERY) # API request
rows = query_job.result() # Waits for query to finish
for row in rows:
publish_future = publisher.publish(topic_path, row.text.encode("utf-8"))
print(row.text)
# Non-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, row.text))
publish_futures.append(publish_future)
# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
print(f"Published messages with error handler to {topic_path}.")