loggen/message_generator.py (72 lines of code) (raw):
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pushes defined number of random messages to the Pub/Sub."""
import argparse
import datetime
import json
import random
import signal
import sys
import time
from google.cloud import pubsub_v1
# Configure Command line parser for arguments
cmd_flags_parser = argparse.ArgumentParser(
description='Publish messages to Pub/Sub',
prefix_chars='-')
cmd_flags_parser.add_argument('--event_count', type=int,
help='Number of log events to generate',
default=-1)
cmd_flags_parser.add_argument('--topic', type=str,
help='Name of the Pub/Sub topic')
cmd_flags_parser.add_argument('--project-id', type=str,
help='GCP Project Id running the Pub/Sub')
cmd_flags_parser.add_argument('--enable-log', type=bool,
default=False,
help='print logs')
# Extract command-line arguments
cmd_arguments = cmd_flags_parser.parse_args()
# Define configuration
_LOGGING_ENABLED = cmd_arguments.enable_log
_EXPERIMENT_VARIANTS = ['default', '1', '2', '3']
_SEND_EVENTS_COUNT = cmd_arguments.event_count # Default send infinite messages
_PUB_SUB_TOPIC = cmd_arguments.topic
_GCP_PROJECT_ID = cmd_arguments.project_id
_PUBLISHER = pubsub_v1.PublisherClient()
_START_TIME = time.time()
_TOPIC_PATH = _PUBLISHER.topic_path(_GCP_PROJECT_ID, _PUB_SUB_TOPIC)
message_count = 0
def build_user_id():
"""
Generates random user ids with some overlap to simulate a real world
user behaviour on an app or website.
:return: A slowly changing random number.
"""
elapsed_tens_minutes = int((time.time() - _START_TIME) / 600) + 1
present_millis = int(1000 * (time.time() - int(time.time())))
if present_millis == 0:
present_millis = random.randint(1,1000)
if _LOGGING_ENABLED:
print(
'generating user_id: elapsed_tens_minute = {}, present_millis = {}'.format(
elapsed_tens_minutes, present_millis))
return random.randint(elapsed_tens_minutes + present_millis,
(10 + elapsed_tens_minutes) * present_millis)
def build_message():
""" Generates an event message imitation
:return: A random event message
"""
return dict(
uid=build_user_id(),
# change experiment ids based on date/time
experiment_id=random.randint(1, 100),
variant=_EXPERIMENT_VARIANTS[random.randint(0, 3)],
timestamp=datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ'))
def send_to_pub_sub(message):
""" Sends the provided payload as JSON to Pub/Sub.
:param message: the Event information payload
:return: the published message future.
"""
return _PUBLISHER.publish(_TOPIC_PATH,
data=json.dumps(message).encode('utf-8'))
def print_message_count_before_exit(sig, frame):
""" Interrupt handler to print the count of messages sent to pub/sub before
exiting python.
:param sig: the interrupt signal.
:param frame: the stackframe.
"""
print('\nSent {} messages.\nExiting'.format(message_count))
sys.exit(0)
# Register message count printer
signal.signal(signal.SIGINT, print_message_count_before_exit)
print('ProjectId: {}\nPub/Sub Topic: {}'.format(_GCP_PROJECT_ID, _TOPIC_PATH))
print('Sending events in background.')
print('Press Ctrl+C to exit/stop.')
# Infinite loop to keep sending messages to pub/sub
while _SEND_EVENTS_COUNT == -1 or message_count < _SEND_EVENTS_COUNT:
event_message = build_message()
if (_LOGGING_ENABLED):
print('Sending Message {}\n{}'.format(message_count + 1,
json.dumps(event_message)))
message_count += 1
pub_sub_message_unique_id = send_to_pub_sub(event_message)
if (_LOGGING_ENABLED):
print(
'pub_sub_message_id: {}'.format(pub_sub_message_unique_id.result()))
_sleep_time = random.randint(10, 10000) # Random sleep time in milli-seconds.
if (_LOGGING_ENABLED):
print('Sleeping for {} ms'.format(_sleep_time))
time.sleep(_sleep_time / 1000)