in decisionai_plugin/common/util/kafka_operator_confluent.py [0:0]
def send_message(topic, message):
global producer
kafka_configs = get_kafka_configs()
if producer is None:
producer = Producer(**{**kafka_configs,
'retries': 5
})
try:
producer.produce(topic, json.dumps(message).encode('utf-8'))
producer.flush()
log.count("write_to_kafka", 1, topic=topic, result='Success')
except Exception as e:
producer = None
log.count("write_to_kafka", 1, topic=topic, result='Failed')
log.error("Produce message failed. Error message: " + str(e))