supporting-blog-content/elasticsearch-through-apache-kafka/kafka_producer.py (47 lines of code) (raw):

from datetime import datetime from kafka import KafkaProducer import json import time import logging import random logging.basicConfig(level=logging.INFO) logger = logging.getLogger("log_producer") producer = KafkaProducer( bootstrap_servers=["localhost:9092"], # Specifies the Kafka server to connect value_serializer=lambda x: json.dumps(x).encode( "utf-8" ), # Serializes data as JSON and encodes it to UTF-8 before sending batch_size=16384, # Sets the maximum batch size in bytes (here, 16 KB) for buffered messages before sending linger_ms=10, # Sets the maximum delay (in milliseconds) before sending the batch acks="all", # Specifies acknowledgment level; 'all' ensures message durability by waiting for all replicas to acknowledge ) def generate_log_message(): diff_seconds = random.uniform(300, 600) timestamp = time.time() - diff_seconds log_messages = { "INFO": [ "User login successful", "Database connection established", "Service started", "Payment processed", ], "WARNING": ["Service stopped", "Payment may not have been processed"], "ERROR": ["User login failed", "Database connection failed", "Payment failed"], "DEBUG": ["Debugging user login flow", "Debugging database connection"], } level = random.choice(list(log_messages.keys())) message = random.choice(log_messages[level]) log_entry = {"level": level, "message": message, "timestamp": timestamp} return log_entry def send_log_batches(topic, num_batches=5, batch_size=10): for i in range(num_batches): logger.info(f"Sending batch {i + 1}/{num_batches}") for _ in range(batch_size): log_message = generate_log_message() producer.send(topic, value=log_message) producer.flush() time.sleep(1) if __name__ == "__main__": topic = "logs" send_log_batches(topic) producer.close()