in pyflink-walkthrough/generator/generate_source_data.py [0:0]
def create_producer():
print("Connecting to Kafka brokers")
for i in range(0, 6):
try:
producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
value_serializer=lambda x: dumps(x).encode('utf-8'))
print("Connected to Kafka")
return producer
except errors.NoBrokersAvailable:
print("Waiting for brokers to become available")
sleep(10)
raise RuntimeError("Failed to connect to brokers within 60 seconds")